You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/01/08 16:36:18 UTC
[10/13] tajo git commit: TAJO-1288: Refactoring
org.apache.tajo.master package.
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
deleted file mode 100644
index 559fc14..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-
-import com.google.gson.annotations.Expose;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.engine.json.CoreGsonHelper;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.ClientProtos.QueryInfoProto;
-import org.apache.tajo.json.GsonObject;
-import org.apache.tajo.util.TajoIdUtils;
-import org.apache.tajo.util.history.History;
-
-public class QueryInfo implements GsonObject, History {
- private QueryId queryId;
- @Expose
- private QueryContext context;
- @Expose
- private String sql;
- @Expose
- private volatile TajoProtos.QueryState queryState;
- @Expose
- private volatile float progress;
- @Expose
- private volatile long startTime;
- @Expose
- private volatile long finishTime;
- @Expose
- private String lastMessage;
- @Expose
- private String hostNameOfQM;
- @Expose
- private int queryMasterPort;
- @Expose
- private int queryMasterClientPort;
- @Expose
- private int queryMasterInfoPort;
- @Expose
- private String queryIdStr;
- @Expose
- private volatile TableDesc resultDesc;
-
- private String jsonExpr;
-
- public QueryInfo(QueryId queryId) {
- this(queryId, null, null, null);
- }
-
- public QueryInfo(QueryId queryId, QueryContext queryContext, String sql, String jsonExpr) {
- this.queryId = queryId;
- this.queryIdStr = queryId.toString();
- this.context = queryContext;
- this.sql = sql;
- this.jsonExpr = jsonExpr;
-
- this.queryState = TajoProtos.QueryState.QUERY_MASTER_INIT;
- }
-
- public QueryId getQueryId() {
- return queryId;
- }
-
- public QueryContext getQueryContext() {
- return context;
- }
-
- public String getSql() {
- return sql;
- }
-
- public String getQueryMasterHost() {
- return hostNameOfQM;
- }
-
- public void setQueryMaster(String hostName) {
- this.hostNameOfQM = hostName;
- }
-
- public int getQueryMasterInfoPort() {
- return queryMasterInfoPort;
- }
-
- public void setQueryMasterInfoPort(int queryMasterInfoPort) {
- this.queryMasterInfoPort = queryMasterInfoPort;
- }
-
- public void setQueryMasterPort(int port) {
- this.queryMasterPort = port;
- }
-
- public int getQueryMasterPort() {
- return queryMasterPort;
- }
-
- public void setQueryMasterclientPort(int port) {
- queryMasterClientPort = port;
- }
-
- public int getQueryMasterClientPort() {
- return queryMasterClientPort;
- }
-
- public TajoProtos.QueryState getQueryState() {
- return queryState;
- }
-
- public void setQueryState(TajoProtos.QueryState queryState) {
- this.queryState = queryState;
- }
-
- public long getStartTime() {
- return startTime;
- }
-
- public void setStartTime(long startTime) {
- this.startTime = startTime;
- }
-
- public long getFinishTime() {
- return finishTime;
- }
-
- public void setFinishTime(long finishTime) {
- this.finishTime = finishTime;
- }
-
- public String getLastMessage() {
- return lastMessage;
- }
-
- public void setLastMessage(String lastMessage) {
- this.lastMessage = lastMessage;
- }
-
- public float getProgress() {
- return progress;
- }
-
- public void setProgress(float progress) {
- this.progress = progress;
- }
-
- public void setResultDesc(TableDesc result) {
- this.resultDesc = result;
- }
-
- public boolean hasResultdesc() {
- return resultDesc != null;
- }
-
- public TableDesc getResultDesc() {
- return resultDesc;
- }
-
- @Override
- public String toString() {
- return queryId.toString() + ",state=" + queryState +",progress=" + progress + ", queryMaster="
- + getQueryMasterHost();
- }
-
- public String getJsonExpr() {
- return jsonExpr;
- }
-
- @Override
- public String toJson() {
- return CoreGsonHelper.toJson(this, QueryInfo.class);
- }
-
- @Override
- public HistoryType getHistoryType() {
- return HistoryType.QUERY_SUMMARY;
- }
-
- public static QueryInfo fromJson(String json) {
- QueryInfo queryInfo = CoreGsonHelper.fromJson(json, QueryInfo.class);
- queryInfo.queryId = TajoIdUtils.parseQueryId(queryInfo.queryIdStr);
- return queryInfo;
- }
-
- public String getQueryIdStr() {
- return queryIdStr;
- }
-
- public QueryInfoProto getProto() {
- QueryInfoProto.Builder builder = QueryInfoProto.newBuilder();
-
- builder.setQueryId(queryId.toString())
- .setQueryState(queryState)
- .setContextVars(context.getProto())
- .setProgress(progress)
- .setStartTime(startTime)
- .setFinishTime(finishTime)
- .setQueryMasterPort(queryMasterPort)
- .setQueryMasterClientPort(queryMasterClientPort)
- .setQueryMasterInfoPort(queryMasterInfoPort);
-
- if (resultDesc != null) {
- builder.setResultDesc(resultDesc.getProto());
- }
-
- if (sql != null) {
- builder.setSql(sql);
- }
-
- if (lastMessage != null) {
- builder.setLastMessage(lastMessage);
- }
-
- if (hostNameOfQM != null) {
- builder.setHostNameOfQM(hostNameOfQM);
- }
-
- return builder.build();
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
deleted file mode 100644
index ce30ec7..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import org.apache.hadoop.yarn.event.AbstractEvent;
-
-public class QueryJobEvent extends AbstractEvent<QueryJobEvent.Type> {
- private QueryInfo queryInfo;
-
- public QueryJobEvent(Type type, QueryInfo queryInfo) {
- super(type);
-
- this.queryInfo = queryInfo;
- }
-
- public QueryInfo getQueryInfo() {
- return this.queryInfo;
- }
-
- public enum Type {
- QUERY_JOB_START,
- QUERY_JOB_HEARTBEAT,
- QUERY_JOB_FINISH,
- QUERY_JOB_STOP,
- QUERY_MASTER_START,
- QUERY_MASTER_STOP,
- QUERY_JOB_KILL
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
deleted file mode 100644
index 13f6456..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.session.Session;
-import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.scheduler.SimpleFifoScheduler;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class QueryJobManager extends CompositeService {
- private static final Log LOG = LogFactory.getLog(QueryJobManager.class.getName());
-
- // TajoMaster Context
- private final TajoMaster.MasterContext masterContext;
-
- private AsyncDispatcher dispatcher;
-
- private SimpleFifoScheduler scheduler;
-
- private final Map<QueryId, QueryInProgress> submittedQueries = Maps.newConcurrentMap();
-
- private final Map<QueryId, QueryInProgress> runningQueries = Maps.newConcurrentMap();
-
- private AtomicLong minExecutionTime = new AtomicLong(Long.MAX_VALUE);
- private AtomicLong maxExecutionTime = new AtomicLong();
- private AtomicLong avgExecutionTime = new AtomicLong();
- private AtomicLong executedQuerySize = new AtomicLong();
-
- public QueryJobManager(final TajoMaster.MasterContext masterContext) {
- super(QueryJobManager.class.getName());
- this.masterContext = masterContext;
- }
-
- @Override
- public void init(Configuration conf) {
- try {
- this.dispatcher = new AsyncDispatcher();
- addService(this.dispatcher);
-
- this.dispatcher.register(QueryJobEvent.Type.class, new QueryJobManagerEventHandler());
-
- this.scheduler = new SimpleFifoScheduler(this);
- } catch (Exception e) {
- catchException(null, e);
- }
-
- super.init(conf);
- }
-
- @Override
- public void stop() {
- synchronized(runningQueries) {
- for(QueryInProgress eachQueryInProgress: runningQueries.values()) {
- eachQueryInProgress.stop();
- }
- }
- this.scheduler.stop();
- super.stop();
- }
-
- @Override
- public void start() {
- this.scheduler.start();
- super.start();
- }
-
- public EventHandler getEventHandler() {
- return dispatcher.getEventHandler();
- }
-
- public Collection<QueryInProgress> getSubmittedQueries() {
- synchronized (submittedQueries){
- return Collections.unmodifiableCollection(submittedQueries.values());
- }
- }
-
- public Collection<QueryInProgress> getRunningQueries() {
- synchronized (runningQueries){
- return Collections.unmodifiableCollection(runningQueries.values());
- }
- }
-
- public synchronized Collection<QueryInfo> getFinishedQueries() {
- try {
- return this.masterContext.getHistoryReader().getQueries(null);
- } catch (Throwable e) {
- LOG.error(e);
- return Lists.newArrayList();
- }
- }
-
-
- public synchronized QueryInfo getFinishedQuery(QueryId queryId) {
- try {
- return this.masterContext.getHistoryReader().getQueryInfo(queryId.toString());
- } catch (Throwable e) {
- LOG.error(e);
- return null;
- }
- }
-
- public QueryInfo createNewQueryJob(Session session, QueryContext queryContext, String sql,
- String jsonExpr, LogicalRootNode plan)
- throws Exception {
- QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
- QueryInProgress queryInProgress = new QueryInProgress(masterContext, session, queryContext, queryId, sql,
- jsonExpr, plan);
-
- synchronized (submittedQueries) {
- queryInProgress.getQueryInfo().setQueryMaster("");
- submittedQueries.put(queryInProgress.getQueryId(), queryInProgress);
- }
-
- scheduler.addQuery(queryInProgress);
- return queryInProgress.getQueryInfo();
- }
-
- public QueryInfo startQueryJob(QueryId queryId) throws Exception {
-
- QueryInProgress queryInProgress;
-
- synchronized (submittedQueries) {
- queryInProgress = submittedQueries.remove(queryId);
- }
-
- synchronized (runningQueries) {
- runningQueries.put(queryInProgress.getQueryId(), queryInProgress);
- }
-
- addService(queryInProgress);
- queryInProgress.init(getConfig());
- queryInProgress.start();
-
- if (!queryInProgress.startQueryMaster()) {
- stopQuery(queryId);
- }
-
- return queryInProgress.getQueryInfo();
- }
-
- public TajoMaster.MasterContext getMasterContext() {
- return masterContext;
- }
-
- class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> {
- @Override
- public void handle(QueryJobEvent event) {
- QueryInProgress queryInProgress = getQueryInProgress(event.getQueryInfo().getQueryId());
- if(queryInProgress == null) {
- LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]");
- return;
- }
-
- if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) {
- stopQuery(event.getQueryInfo().getQueryId());
- } else if (queryInProgress.isStarted()) {
- queryInProgress.getEventHandler().handle(event);
- } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
- scheduler.removeQuery(queryInProgress.getQueryId());
- queryInProgress.getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
-
- stopQuery(queryInProgress.getQueryId());
- }
- }
- }
-
- public QueryInProgress getQueryInProgress(QueryId queryId) {
- QueryInProgress queryInProgress;
- synchronized (submittedQueries) {
- queryInProgress = submittedQueries.get(queryId);
- }
-
- if (queryInProgress == null) {
- synchronized (runningQueries) {
- queryInProgress = runningQueries.get(queryId);
- }
- }
- return queryInProgress;
- }
-
- public void stopQuery(QueryId queryId) {
- LOG.info("Stop QueryInProgress:" + queryId);
- QueryInProgress queryInProgress = getQueryInProgress(queryId);
- if(queryInProgress != null) {
- queryInProgress.stop();
- synchronized(submittedQueries) {
- submittedQueries.remove(queryId);
- }
-
- synchronized(runningQueries) {
- runningQueries.remove(queryId);
- }
-
- QueryInfo queryInfo = queryInProgress.getQueryInfo();
- long executionTime = queryInfo.getFinishTime() - queryInfo.getStartTime();
- if (executionTime < minExecutionTime.get()) {
- minExecutionTime.set(executionTime);
- }
-
- if (executionTime > maxExecutionTime.get()) {
- maxExecutionTime.set(executionTime);
- }
-
- long totalExecutionTime = executedQuerySize.get() * avgExecutionTime.get();
- if (totalExecutionTime > 0) {
- avgExecutionTime.set((totalExecutionTime + executionTime) / (executedQuerySize.get() + 1));
- } else {
- avgExecutionTime.set(executionTime);
- }
- executedQuerySize.incrementAndGet();
- removeService(queryInProgress);
- } else {
- LOG.warn("No QueryInProgress while query stopping: " + queryId);
- }
- }
-
- public long getMinExecutionTime() {
- if (getExecutedQuerySize() == 0) return 0;
- return minExecutionTime.get();
- }
-
- public long getMaxExecutionTime() {
- return maxExecutionTime.get();
- }
-
- public long getAvgExecutionTime() {
- return avgExecutionTime.get();
- }
-
- public long getExecutedQuerySize() {
- return executedQuerySize.get();
- }
-
- private void catchException(QueryId queryId, Exception e) {
- LOG.error(e.getMessage(), e);
- QueryInProgress queryInProgress = runningQueries.get(queryId);
- queryInProgress.catchException(e);
- }
-
- public synchronized TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand queryHeartbeat(
- TajoMasterProtocol.TajoHeartbeat queryHeartbeat) {
- QueryInProgress queryInProgress = getQueryInProgress(new QueryId(queryHeartbeat.getQueryId()));
- if(queryInProgress == null) {
- return null;
- }
-
- QueryInfo queryInfo = makeQueryInfoFromHeartbeat(queryHeartbeat);
- getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_HEARTBEAT, queryInfo));
-
- return null;
- }
-
- private QueryInfo makeQueryInfoFromHeartbeat(TajoMasterProtocol.TajoHeartbeat queryHeartbeat) {
- QueryInfo queryInfo = new QueryInfo(new QueryId(queryHeartbeat.getQueryId()));
- WorkerConnectionInfo connectionInfo = new WorkerConnectionInfo(queryHeartbeat.getConnectionInfo());
-
- queryInfo.setQueryMaster(connectionInfo.getHost());
- queryInfo.setQueryMasterPort(connectionInfo.getQueryMasterPort());
- queryInfo.setQueryMasterclientPort(connectionInfo.getClientPort());
- queryInfo.setLastMessage(queryHeartbeat.getStatusMessage());
- queryInfo.setQueryState(queryHeartbeat.getState());
- queryInfo.setProgress(queryHeartbeat.getQueryProgress());
-
- if (queryHeartbeat.hasQueryFinishTime()) {
- queryInfo.setFinishTime(queryHeartbeat.getQueryFinishTime());
- }
-
- if (queryHeartbeat.hasResultDesc()) {
- queryInfo.setResultDesc(new TableDesc(queryHeartbeat.getResultDesc()));
- }
-
- return queryInfo;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
deleted file mode 100644
index 641de78..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ /dev/null
@@ -1,631 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tajo.*;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.global.GlobalPlanner;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.event.QueryStartEvent;
-import org.apache.tajo.master.event.QueryStopEvent;
-import org.apache.tajo.rpc.CallFuture;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcConnectionPool;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.util.history.QueryHistory;
-import org.apache.tajo.worker.TajoWorker;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeat;
-import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse;
-
-// TODO - when exception, send error status to QueryJobManager
-public class QueryMaster extends CompositeService implements EventHandler {
- private static final Log LOG = LogFactory.getLog(QueryMaster.class.getName());
-
- private int querySessionTimeout;
-
- private Clock clock;
-
- private AsyncDispatcher dispatcher;
-
- private GlobalPlanner globalPlanner;
-
- private TajoConf systemConf;
-
- private Map<QueryId, QueryMasterTask> queryMasterTasks = Maps.newConcurrentMap();
-
- private Map<QueryId, QueryMasterTask> finishedQueryMasterTasks = Maps.newConcurrentMap();
-
- private ClientSessionTimeoutCheckThread clientSessionTimeoutCheckThread;
-
- private AtomicBoolean queryMasterStop = new AtomicBoolean(false);
-
- private QueryMasterContext queryMasterContext;
-
- private QueryContext queryContext;
-
- private QueryHeartbeatThread queryHeartbeatThread;
-
- private FinishedQueryMasterTaskCleanThread finishedQueryMasterTaskCleanThread;
-
- private TajoWorker.WorkerContext workerContext;
-
- private RpcConnectionPool connPool;
-
- private ExecutorService eventExecutor;
-
- public QueryMaster(TajoWorker.WorkerContext workerContext) {
- super(QueryMaster.class.getName());
- this.workerContext = workerContext;
- }
-
- public void init(Configuration conf) {
- LOG.info("QueryMaster init");
- try {
- this.systemConf = (TajoConf)conf;
- this.connPool = RpcConnectionPool.getPool(systemConf);
-
- querySessionTimeout = systemConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT);
- queryMasterContext = new QueryMasterContext(systemConf);
-
- clock = new SystemClock();
-
- this.dispatcher = new AsyncDispatcher();
- addIfService(dispatcher);
-
- globalPlanner = new GlobalPlanner(systemConf, workerContext);
-
- dispatcher.register(QueryStartEvent.EventType.class, new QueryStartEventHandler());
- dispatcher.register(QueryStopEvent.EventType.class, new QueryStopEventHandler());
-
- } catch (Throwable t) {
- LOG.error(t.getMessage(), t);
- throw new RuntimeException(t);
- }
- super.init(conf);
- }
-
- @Override
- public void start() {
- LOG.info("QueryMaster start");
-
- queryHeartbeatThread = new QueryHeartbeatThread();
- queryHeartbeatThread.start();
-
- clientSessionTimeoutCheckThread = new ClientSessionTimeoutCheckThread();
- clientSessionTimeoutCheckThread.start();
-
- finishedQueryMasterTaskCleanThread = new FinishedQueryMasterTaskCleanThread();
- finishedQueryMasterTaskCleanThread.start();
-
- eventExecutor = Executors.newSingleThreadExecutor();
- super.start();
- }
-
- @Override
- public void stop() {
- if(queryMasterStop.getAndSet(true)){
- return;
- }
-
- if(queryHeartbeatThread != null) {
- queryHeartbeatThread.interrupt();
- }
-
- if(clientSessionTimeoutCheckThread != null) {
- clientSessionTimeoutCheckThread.interrupt();
- }
-
- if(finishedQueryMasterTaskCleanThread != null) {
- finishedQueryMasterTaskCleanThread.interrupt();
- }
-
- if(eventExecutor != null){
- eventExecutor.shutdown();
- }
-
- super.stop();
-
- LOG.info("QueryMaster stop");
- if(queryMasterContext.getWorkerContext().isYarnContainerMode()) {
- queryMasterContext.getWorkerContext().stopWorker(true);
- }
- }
-
- protected void cleanupExecutionBlock(List<TajoIdProtos.ExecutionBlockIdProto> executionBlockIds) {
- StringBuilder cleanupMessage = new StringBuilder();
- String prefix = "";
- for (TajoIdProtos.ExecutionBlockIdProto eachEbId: executionBlockIds) {
- cleanupMessage.append(prefix).append(new ExecutionBlockId(eachEbId).toString());
- prefix = ",";
- }
- LOG.info("cleanup executionBlocks: " + cleanupMessage);
- NettyClientBase rpc = null;
- List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker();
- TajoWorkerProtocol.ExecutionBlockListProto.Builder builder = TajoWorkerProtocol.ExecutionBlockListProto.newBuilder();
- builder.addAllExecutionBlockId(Lists.newArrayList(executionBlockIds));
- TajoWorkerProtocol.ExecutionBlockListProto executionBlockListProto = builder.build();
-
- for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
- try {
- TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo();
- rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()),
- TajoWorkerProtocol.class, true);
- TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub();
-
- tajoWorkerProtocolService.cleanupExecutionBlocks(null, executionBlockListProto, NullCallback.get());
- } catch (Exception e) {
- continue;
- } finally {
- connPool.releaseConnection(rpc);
- }
- }
- }
-
- private void cleanup(QueryId queryId) {
- LOG.info("cleanup query resources : " + queryId);
- NettyClientBase rpc = null;
- List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker();
-
- for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
- try {
- TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo();
- rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()),
- TajoWorkerProtocol.class, true);
- TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub();
-
- tajoWorkerProtocolService.cleanup(null, queryId.getProto(), NullCallback.get());
- } catch (Exception e) {
- LOG.error(e.getMessage());
- } finally {
- connPool.releaseConnection(rpc);
- }
- }
- }
-
- public List<TajoMasterProtocol.WorkerResourceProto> getAllWorker() {
-
- NettyClientBase rpc = null;
- try {
- // In TajoMaster HA mode, if backup master be active status,
- // worker may fail to connect existing active master. Thus,
- // if worker can't connect the master, worker should try to connect another master and
- // update master address in worker context.
- if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- try {
- rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
- } catch (Exception e) {
- queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
- HAServiceUtil.getResourceTrackerAddress(systemConf));
- queryMasterContext.getWorkerContext().setTajoMasterAddress(
- HAServiceUtil.getMasterUmbilicalAddress(systemConf));
- rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
- }
- } else {
- rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
- }
-
- TajoMasterProtocol.TajoMasterProtocolService masterService = rpc.getStub();
-
- CallFuture<TajoMasterProtocol.WorkerResourcesRequest> callBack =
- new CallFuture<TajoMasterProtocol.WorkerResourcesRequest>();
- masterService.getAllWorkerResource(callBack.getController(),
- PrimitiveProtos.NullProto.getDefaultInstance(), callBack);
-
- TajoMasterProtocol.WorkerResourcesRequest workerResourcesRequest = callBack.get(2, TimeUnit.SECONDS);
- return workerResourcesRequest.getWorkerResourcesList();
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- } finally {
- connPool.releaseConnection(rpc);
- }
- return new ArrayList<TajoMasterProtocol.WorkerResourceProto>();
- }
-
- public void reportQueryStatusToQueryMaster(QueryId queryId, TajoProtos.QueryState state) {
- LOG.info("Send QueryMaster Ready to QueryJobManager:" + queryId);
- NettyClientBase tmClient = null;
- try {
- // In TajoMaster HA mode, if backup master be active status,
- // worker may fail to connect existing active master. Thus,
- // if worker can't connect the master, worker should try to connect another master and
- // update master address in worker context.
- if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- try {
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
- } catch (Exception e) {
- queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
- HAServiceUtil.getResourceTrackerAddress(systemConf));
- queryMasterContext.getWorkerContext().setTajoMasterAddress(
- HAServiceUtil.getMasterUmbilicalAddress(systemConf));
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
- }
- } else {
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
- }
-
- TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
-
- TajoHeartbeat.Builder queryHeartbeatBuilder = TajoHeartbeat.newBuilder()
- .setConnectionInfo(workerContext.getConnectionInfo().getProto())
- .setState(state)
- .setQueryId(queryId.getProto());
-
- CallFuture<TajoHeartbeatResponse> callBack =
- new CallFuture<TajoHeartbeatResponse>();
-
- masterClientService.heartbeat(callBack.getController(), queryHeartbeatBuilder.build(), callBack);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- } finally {
- connPool.releaseConnection(tmClient);
- }
- }
-
- @Override
- public void handle(Event event) {
- dispatcher.getEventHandler().handle(event);
- }
-
- public Query getQuery(QueryId queryId) {
- return queryMasterTasks.get(queryId).getQuery();
- }
-
- public QueryMasterTask getQueryMasterTask(QueryId queryId) {
- return queryMasterTasks.get(queryId);
- }
-
- public QueryMasterTask getQueryMasterTask(QueryId queryId, boolean includeFinished) {
- QueryMasterTask queryMasterTask = queryMasterTasks.get(queryId);
- if(queryMasterTask != null) {
- return queryMasterTask;
- } else {
- if(includeFinished) {
- return finishedQueryMasterTasks.get(queryId);
- } else {
- return null;
- }
- }
- }
-
- public QueryMasterContext getContext() {
- return this.queryMasterContext;
- }
-
- public Collection<QueryMasterTask> getQueryMasterTasks() {
- return queryMasterTasks.values();
- }
-
- public Collection<QueryMasterTask> getFinishedQueryMasterTasks() {
- return finishedQueryMasterTasks.values();
- }
-
- public class QueryMasterContext {
- private TajoConf conf;
-
- public QueryMasterContext(TajoConf conf) {
- this.conf = conf;
- }
-
- public TajoConf getConf() {
- return conf;
- }
-
- public ExecutorService getEventExecutor(){
- return eventExecutor;
- }
-
- public AsyncDispatcher getDispatcher() {
- return dispatcher;
- }
-
- public Clock getClock() {
- return clock;
- }
-
- public QueryMaster getQueryMaster() {
- return QueryMaster.this;
- }
-
- public GlobalPlanner getGlobalPlanner() {
- return globalPlanner;
- }
-
- public TajoWorker.WorkerContext getWorkerContext() {
- return workerContext;
- }
-
- public EventHandler getEventHandler() {
- return dispatcher.getEventHandler();
- }
-
- public void stopQuery(QueryId queryId) {
- QueryMasterTask queryMasterTask = queryMasterTasks.remove(queryId);
- if(queryMasterTask == null) {
- LOG.warn("No query info:" + queryId);
- return;
- }
-
- finishedQueryMasterTasks.put(queryId, queryMasterTask);
-
- TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(queryMasterTask);
- CallFuture<TajoHeartbeatResponse> future = new CallFuture<TajoHeartbeatResponse>();
-
- NettyClientBase tmClient = null;
- try {
- // In TajoMaster HA mode, if backup master be active status,
- // worker may fail to connect existing active master. Thus,
- // if worker can't connect the master, worker should try to connect another master and
- // update master address in worker context.
- if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- try {
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
- } catch (Exception e) {
- queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf));
- queryMasterContext.getWorkerContext().setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf));
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
- }
- } else {
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
- }
-
- TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
- masterClientService.heartbeat(future.getController(), queryHeartbeat, future);
- } catch (Exception e) {
- //this function will be closed in new thread.
- //When tajo do stop cluster, tajo master maybe throw closed connection exception
-
- LOG.error(e.getMessage(), e);
- } finally {
- connPool.releaseConnection(tmClient);
- }
-
- try {
- queryMasterTask.stop();
- if (!queryContext.getBool(SessionVars.DEBUG_ENABLED)) {
- cleanup(queryId);
- }
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- Query query = queryMasterTask.getQuery();
- if (query != null) {
- QueryHistory queryHisory = query.getQueryHistory();
- if (queryHisory != null) {
- query.context.getQueryMasterContext().getWorkerContext().
- getTaskHistoryWriter().appendHistory(queryHisory);
- }
- }
- if(workerContext.isYarnContainerMode()) {
- stop();
- }
- }
- }
-
- private TajoHeartbeat buildTajoHeartBeat(QueryMasterTask queryMasterTask) {
- TajoHeartbeat.Builder builder = TajoHeartbeat.newBuilder();
-
- builder.setConnectionInfo(workerContext.getConnectionInfo().getProto());
- builder.setQueryId(queryMasterTask.getQueryId().getProto());
- builder.setState(queryMasterTask.getState());
- if (queryMasterTask.getQuery() != null) {
- if (queryMasterTask.getQuery().getResultDesc() != null) {
- builder.setResultDesc(queryMasterTask.getQuery().getResultDesc().getProto());
- }
- builder.setQueryProgress(queryMasterTask.getQuery().getProgress());
- builder.setQueryFinishTime(queryMasterTask.getQuery().getFinishTime());
- }
- return builder.build();
- }
-
- private class QueryStartEventHandler implements EventHandler<QueryStartEvent> {
- @Override
- public void handle(QueryStartEvent event) {
- LOG.info("Start QueryStartEventHandler:" + event.getQueryId());
- QueryMasterTask queryMasterTask = new QueryMasterTask(queryMasterContext,
- event.getQueryId(), event.getSession(), event.getQueryContext(), event.getJsonExpr(), event.getLogicalPlanJson());
-
- synchronized(queryMasterTasks) {
- queryMasterTasks.put(event.getQueryId(), queryMasterTask);
- }
-
- queryMasterTask.init(systemConf);
- if (!queryMasterTask.isInitError()) {
- queryMasterTask.start();
- }
-
- queryContext = event.getQueryContext();
-
- if (queryMasterTask.isInitError()) {
- queryMasterContext.stopQuery(queryMasterTask.getQueryId());
- return;
- }
- }
- }
-
- private class QueryStopEventHandler implements EventHandler<QueryStopEvent> {
- @Override
- public void handle(QueryStopEvent event) {
- queryMasterContext.stopQuery(event.getQueryId());
- }
- }
-
- class QueryHeartbeatThread extends Thread {
- public QueryHeartbeatThread() {
- super("QueryHeartbeatThread");
- }
-
- @Override
- public void run() {
- LOG.info("Start QueryMaster heartbeat thread");
- while(!queryMasterStop.get()) {
- List<QueryMasterTask> tempTasks = new ArrayList<QueryMasterTask>();
- synchronized(queryMasterTasks) {
- tempTasks.addAll(queryMasterTasks.values());
- }
- synchronized(queryMasterTasks) {
- for(QueryMasterTask eachTask: tempTasks) {
- NettyClientBase tmClient;
- try {
- // In TajoMaster HA mode, if backup master be active status,
- // worker may fail to connect existing active master. Thus,
- // if worker can't connect the master, worker should try to connect another master and
- // update master address in worker context.
- if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- try {
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
- } catch (Exception e) {
- queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
- HAServiceUtil.getResourceTrackerAddress(systemConf));
- queryMasterContext.getWorkerContext().setTajoMasterAddress(
- HAServiceUtil.getMasterUmbilicalAddress(systemConf));
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
- }
- } else {
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
- }
-
- TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
-
- CallFuture<TajoHeartbeatResponse> callBack =
- new CallFuture<TajoHeartbeatResponse>();
-
- TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(eachTask);
- masterClientService.heartbeat(callBack.getController(), queryHeartbeat, callBack);
- } catch (Throwable t) {
- t.printStackTrace();
- }
- }
- }
- synchronized(queryMasterStop) {
- try {
- queryMasterStop.wait(2000);
- } catch (InterruptedException e) {
- break;
- }
- }
- }
- LOG.info("QueryMaster heartbeat thread stopped");
- }
- }
-
- class ClientSessionTimeoutCheckThread extends Thread {
- public void run() {
- LOG.info("ClientSessionTimeoutCheckThread started");
- while(!queryMasterStop.get()) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- break;
- }
- List<QueryMasterTask> tempTasks = new ArrayList<QueryMasterTask>();
- synchronized(queryMasterTasks) {
- tempTasks.addAll(queryMasterTasks.values());
- }
-
- for(QueryMasterTask eachTask: tempTasks) {
- if(!eachTask.isStopped()) {
- try {
- long lastHeartbeat = eachTask.getLastClientHeartbeat();
- long time = System.currentTimeMillis() - lastHeartbeat;
- if(lastHeartbeat > 0 && time > querySessionTimeout * 1000) {
- LOG.warn("Query " + eachTask.getQueryId() + " stopped cause query session timeout: " + time + " ms");
- eachTask.expireQuerySession();
- }
- } catch (Exception e) {
- LOG.error(eachTask.getQueryId() + ":" + e.getMessage(), e);
- }
- }
- }
- }
- }
- }
-
- class FinishedQueryMasterTaskCleanThread extends Thread {
- public void run() {
- int expireIntervalTime = systemConf.getIntVar(TajoConf.ConfVars.WORKER_HISTORY_EXPIRE_PERIOD);
- LOG.info("FinishedQueryMasterTaskCleanThread started: expire interval minutes = " + expireIntervalTime);
- while(!queryMasterStop.get()) {
- try {
- Thread.sleep(60 * 1000 * 60); // hourly
- } catch (InterruptedException e) {
- break;
- }
- try {
- long expireTime = System.currentTimeMillis() - expireIntervalTime * 60 * 1000;
- cleanExpiredFinishedQueryMasterTask(expireTime);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- }
- }
-
- private void cleanExpiredFinishedQueryMasterTask(long expireTime) {
- synchronized(finishedQueryMasterTasks) {
- List<QueryId> expiredQueryIds = new ArrayList<QueryId>();
- for(Map.Entry<QueryId, QueryMasterTask> entry: finishedQueryMasterTasks.entrySet()) {
- if(entry.getValue().getStartTime() < expireTime) {
- expiredQueryIds.add(entry.getKey());
- }
- }
-
- for(QueryId eachId: expiredQueryIds) {
- finishedQueryMasterTasks.remove(eachId);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
deleted file mode 100644
index 9f7d3f8..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.tajo.*;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.QueryMasterProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.DefaultTaskScheduler;
-import org.apache.tajo.master.container.TajoContainerId;
-import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.session.Session;
-import org.apache.tajo.rpc.AsyncRpcServer;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.worker.TajoWorker;
-
-import java.net.InetSocketAddress;
-
-public class QueryMasterManagerService extends CompositeService
- implements QueryMasterProtocol.QueryMasterProtocolService.Interface {
- private static final Log LOG = LogFactory.getLog(QueryMasterManagerService.class.getName());
-
- private AsyncRpcServer rpcServer;
- private InetSocketAddress bindAddr;
- private String addr;
- private int port;
-
- private QueryMaster queryMaster;
-
- private TajoWorker.WorkerContext workerContext;
-
- public QueryMasterManagerService(TajoWorker.WorkerContext workerContext, int port) {
- super(QueryMasterManagerService.class.getName());
- this.workerContext = workerContext;
- this.port = port;
- }
-
- public QueryMaster getQueryMaster() {
- return queryMaster;
- }
-
- @Override
- public void init(Configuration conf) {
- Preconditions.checkArgument(conf instanceof TajoConf);
- TajoConf tajoConf = (TajoConf) conf;
- try {
- // Setup RPC server
- InetSocketAddress initIsa =
- new InetSocketAddress("0.0.0.0", port);
- if (initIsa.getAddress() == null) {
- throw new IllegalArgumentException("Failed resolve of " + initIsa);
- }
-
- int workerNum = tajoConf.getIntVar(TajoConf.ConfVars.QUERY_MASTER_RPC_SERVER_WORKER_THREAD_NUM);
- this.rpcServer = new AsyncRpcServer(QueryMasterProtocol.class, this, initIsa, workerNum);
- this.rpcServer.start();
-
- this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
- this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
-
- this.port = bindAddr.getPort();
-
- queryMaster = new QueryMaster(workerContext);
- addService(queryMaster);
-
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- // Get the master address
- LOG.info("QueryMasterManagerService is bind to " + addr);
- ((TajoConf)conf).setVar(TajoConf.ConfVars.WORKER_QM_RPC_ADDRESS, addr);
-
- super.init(conf);
- }
-
- @Override
- public void start() {
- super.start();
- }
-
- @Override
- public void stop() {
- if(rpcServer != null) {
- rpcServer.shutdown();
- }
- LOG.info("QueryMasterManagerService stopped");
- super.stop();
- }
-
- public InetSocketAddress getBindAddr() {
- return bindAddr;
- }
-
- public String getHostAndPort() {
- return bindAddr.getHostName() + ":" + bindAddr.getPort();
- }
-
- @Override
- public void getTask(RpcController controller, TajoWorkerProtocol.GetTaskRequestProto request,
- RpcCallback<TajoWorkerProtocol.TaskRequestProto> done) {
- try {
- ExecutionBlockId ebId = new ExecutionBlockId(request.getExecutionBlockId());
- QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(ebId.getQueryId());
-
- if(queryMasterTask == null || queryMasterTask.isStopped()) {
- done.run(DefaultTaskScheduler.stopTaskRunnerReq);
- } else {
- TajoContainerId cid =
- queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());
- LOG.debug("getTask:" + cid + ", ebId:" + ebId);
- queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(request.getWorkerId(), cid, ebId, done));
- }
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
- @Override
- public void statusUpdate(RpcController controller, TajoWorkerProtocol.TaskStatusProto request,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
- try {
- QueryId queryId = new QueryId(request.getId().getTaskId().getExecutionBlockId().getQueryId());
- TaskAttemptId attemptId = new TaskAttemptId(request.getId());
- QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
- if (queryMasterTask == null) {
- queryMasterTask = queryMaster.getQueryMasterTask(queryId, true);
- }
- Stage sq = queryMasterTask.getQuery().getStage(attemptId.getTaskId().getExecutionBlockId());
- Task task = sq.getTask(attemptId.getTaskId());
- TaskAttempt attempt = task.getAttempt(attemptId.getId());
-
- if(LOG.isDebugEnabled()){
- LOG.debug(String.format("Task State: %s, Attempt State: %s", task.getState().name(), attempt.getState().name()));
- }
-
- if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) {
- LOG.warn(attemptId + " Killed");
- attempt.handle(
- new TaskAttemptEvent(new TaskAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED));
- } else {
- queryMasterTask.getEventHandler().handle(
- new TaskAttemptStatusUpdateEvent(new TaskAttemptId(request.getId()), request));
- }
- done.run(TajoWorker.TRUE_PROTO);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- done.run(TajoWorker.FALSE_PROTO);
- }
- }
-
- @Override
- public void ping(RpcController controller,
- TajoIdProtos.ExecutionBlockIdProto requestProto,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
- done.run(TajoWorker.TRUE_PROTO);
- }
-
- @Override
- public void fatalError(RpcController controller, TajoWorkerProtocol.TaskFatalErrorReport report,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
- try {
- QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
- new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
- if (queryMasterTask != null) {
- queryMasterTask.handleTaskFailed(report);
- } else {
- LOG.warn("No QueryMasterTask: " + new TaskAttemptId(report.getId()));
- }
- done.run(TajoWorker.TRUE_PROTO);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- done.run(TajoWorker.FALSE_PROTO);
- }
- }
-
- @Override
- public void done(RpcController controller, TajoWorkerProtocol.TaskCompletionReport report,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
- try {
- QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
- new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
- if (queryMasterTask != null) {
- queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report));
- }
- done.run(TajoWorker.TRUE_PROTO);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- done.run(TajoWorker.FALSE_PROTO);
- }
- }
-
- @Override
- public void doneExecutionBlock(
- RpcController controller, TajoWorkerProtocol.ExecutionBlockReport request,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
- QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(new QueryId(request.getEbId().getQueryId()));
- if (queryMasterTask != null) {
- ExecutionBlockId ebId = new ExecutionBlockId(request.getEbId());
- queryMasterTask.getQuery().getStage(ebId).receiveExecutionBlockReport(request);
- }
- done.run(TajoWorker.TRUE_PROTO);
- }
-
- @Override
- public void killQuery(RpcController controller, TajoIdProtos.QueryIdProto request,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
- QueryId queryId = new QueryId(request);
- QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
- if (queryMasterTask != null) {
- Query query = queryMasterTask.getQuery();
- if (query != null) {
- query.handle(new QueryEvent(queryId, QueryEventType.KILL));
- }
- }
- }
-
- @Override
- public void executeQuery(RpcController controller,
- TajoWorkerProtocol.QueryExecutionRequestProto request,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
- try {
- workerContext.getWorkerSystemMetrics().counter("querymaster", "numQuery").inc();
-
- QueryId queryId = new QueryId(request.getQueryId());
- LOG.info("Receive executeQuery request:" + queryId);
- queryMaster.handle(new QueryStartEvent(queryId,
- new Session(request.getSession()),
- new QueryContext(workerContext.getQueryMaster().getContext().getConf(),
- request.getQueryContext()), request.getExprInJson().getValue(),
- request.getLogicalPlanJson().getValue()));
- done.run(TajoWorker.TRUE_PROTO);
- } catch (Exception e) {
- workerContext.getWorkerSystemMetrics().counter("querymaster", "errorQuery").inc();
- LOG.error(e.getMessage(), e);
- done.run(TajoWorker.FALSE_PROTO);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
deleted file mode 100644
index 56dd789..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.util.TajoIdUtils;
-
-import java.io.PrintWriter;
-import java.lang.management.ManagementFactory;
-import java.lang.management.ThreadInfo;
-import java.lang.management.ThreadMXBean;
-
-@Deprecated
-public class QueryMasterRunner extends AbstractService {
- private static final Log LOG = LogFactory.getLog(QueryMasterRunner.class);
- private TajoConf systemConf;
- private QueryMaster queryMaster;
- private QueryId queryId;
- private String queryMasterManagerAddress;
-
- public QueryMasterRunner(QueryId queryId, String queryMasterManagerAddress) {
- super(QueryMasterRunner.class.getName());
- this.queryId = queryId;
- this.queryMasterManagerAddress = queryMasterManagerAddress;
- }
-
- private class ShutdownHook implements Runnable {
- @Override
- public void run() {
- LOG.info("============================================");
- LOG.info("QueryMaster received SIGINT Signal");
- LOG.info("============================================");
- stop();
- }
- }
-
- @Override
- public void init(Configuration conf) {
- this.systemConf = (TajoConf)conf;
- RackResolver.init(systemConf);
- Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
- super.init(conf);
- }
-
- @Override
- public void start() {
- //create QueryMaster
- QueryMaster query = new QueryMaster(null);
-
- query.init(systemConf);
- query.start();
- }
-
- @Override
- public void stop() {
- }
-
- public static void main(String[] args) throws Exception {
- LOG.info("QueryMasterRunner started");
-
- final TajoConf conf = new TajoConf();
- conf.addResource(new Path(TajoConstants.SYSTEM_CONF_FILENAME));
-
- UserGroupInformation.setConfiguration(conf);
-
- final QueryId queryId = TajoIdUtils.parseQueryId(args[0]);
- final String queryMasterManagerAddr = args[1];
-
- LOG.info("Received QueryId:" + queryId);
-
- QueryMasterRunner queryMasterRunner = new QueryMasterRunner(queryId, queryMasterManagerAddr);
- queryMasterRunner.init(conf);
- queryMasterRunner.start();
-
- synchronized(queryId) {
- queryId.wait();
- }
-
- System.exit(0);
- }
-
- public static void printThreadInfo(PrintWriter stream, String title) {
- ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
- final int STACK_DEPTH = 60;
- boolean contention = threadBean.isThreadContentionMonitoringEnabled();
- long[] threadIds = threadBean.getAllThreadIds();
- stream.println("Process Thread Dump: " + title);
- stream.println(threadIds.length + " active threads");
- for (long tid : threadIds) {
- ThreadInfo info = threadBean.getThreadInfo(tid, STACK_DEPTH);
- if (info == null) {
- stream.println(" Inactive");
- continue;
- }
- stream.println("Thread " + getTaskName(info.getThreadId(), info.getThreadName()) + ":");
- Thread.State state = info.getThreadState();
- stream.println(" State: " + state);
- stream.println(" Blocked count: " + info.getBlockedCount());
- stream.println(" Waited count: " + info.getWaitedCount());
- if (contention) {
- stream.println(" Blocked time: " + info.getBlockedTime());
- stream.println(" Waited time: " + info.getWaitedTime());
- }
- if (state == Thread.State.WAITING) {
- stream.println(" Waiting on " + info.getLockName());
- } else if (state == Thread.State.BLOCKED) {
- stream.println(" Blocked on " + info.getLockName());
- stream.println(" Blocked by " + getTaskName(info.getLockOwnerId(), info.getLockOwnerName()));
- }
- stream.println(" Stack:");
- for (StackTraceElement frame : info.getStackTrace()) {
- stream.println(" " + frame.toString());
- }
- }
- stream.flush();
- }
-
- private static String getTaskName(long id, String name) {
- if (name == null) {
- return Long.toString(id);
- }
- return id + " (" + name + ")";
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
deleted file mode 100644
index 9c789a5..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ /dev/null
@@ -1,638 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.tajo.*;
-import org.apache.tajo.algebra.Expr;
-import org.apache.tajo.algebra.JsonHelper;
-import org.apache.tajo.catalog.CatalogService;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.exception.UnimplementedException;
-import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.TajoContainerProxy;
-import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.rm.TajoWorkerResourceManager;
-import org.apache.tajo.master.session.Session;
-import org.apache.tajo.plan.LogicalOptimizer;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.LogicalPlanner;
-import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.plan.logical.NodeType;
-import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
-import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.plan.verifier.VerifyException;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.RpcConnectionPool;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.StorageProperty;
-import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.util.metrics.TajoMetrics;
-import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter;
-import org.apache.tajo.worker.AbstractResourceAllocator;
-import org.apache.tajo.worker.TajoResourceAllocator;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.tajo.TajoProtos.QueryState;
-
-public class QueryMasterTask extends CompositeService {
- private static final Log LOG = LogFactory.getLog(QueryMasterTask.class.getName());
-
- // query submission directory is private!
- final public static FsPermission STAGING_DIR_PERMISSION =
- FsPermission.createImmutable((short) 0700); // rwx--------
-
- public static final String TMP_STAGING_DIR_PREFIX = ".staging";
-
- private QueryId queryId;
-
- private Session session;
-
- private QueryContext queryContext;
-
- private QueryMasterTaskContext queryTaskContext;
-
- private QueryMaster.QueryMasterContext queryMasterContext;
-
- private Query query;
-
- private MasterPlan masterPlan;
-
- private String jsonExpr;
-
- private String logicalPlanJson;
-
- private AsyncDispatcher dispatcher;
-
- private final long querySubmitTime;
-
- private Map<String, TableDesc> tableDescMap = new HashMap<String, TableDesc>();
-
- private TajoConf systemConf;
-
- private AtomicLong lastClientHeartbeat = new AtomicLong(-1);
-
- private AbstractResourceAllocator resourceAllocator;
-
- private AtomicBoolean stopped = new AtomicBoolean(false);
-
- private TajoMetrics queryMetrics;
-
- private Throwable initError;
-
- private final List<TajoWorkerProtocol.TaskFatalErrorReport> diagnostics =
- new ArrayList<TajoWorkerProtocol.TaskFatalErrorReport>();
-
- public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
- QueryId queryId, Session session, QueryContext queryContext, String jsonExpr,
- String logicalPlanJson) {
-
- super(QueryMasterTask.class.getName());
- this.queryMasterContext = queryMasterContext;
- this.queryId = queryId;
- this.session = session;
- this.queryContext = queryContext;
- this.jsonExpr = jsonExpr;
- this.logicalPlanJson = logicalPlanJson;
- this.querySubmitTime = System.currentTimeMillis();
- }
-
- @Override
- public void init(Configuration conf) {
- systemConf = (TajoConf)conf;
-
- try {
- queryTaskContext = new QueryMasterTaskContext();
- String resourceManagerClassName = systemConf.getVar(TajoConf.ConfVars.RESOURCE_MANAGER_CLASS);
-
- if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) {
- resourceAllocator = new TajoResourceAllocator(queryTaskContext);
- } else {
- throw new UnimplementedException(resourceManagerClassName + " is not supported yet");
- }
- addService(resourceAllocator);
-
- dispatcher = new AsyncDispatcher();
- addService(dispatcher);
-
- dispatcher.register(StageEventType.class, new StageEventDispatcher());
- dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
- dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
- dispatcher.register(QueryMasterQueryCompletedEvent.EventType.class, new QueryFinishEventHandler());
- dispatcher.register(TaskSchedulerEvent.EventType.class, new TaskSchedulerDispatcher());
- dispatcher.register(LocalTaskEventType.class, new LocalTaskEventHandler());
-
- initStagingDir();
-
- queryMetrics = new TajoMetrics(queryId.toString());
-
- super.init(systemConf);
- } catch (Throwable t) {
- LOG.error(t.getMessage(), t);
- initError = t;
- }
- }
-
- public boolean isStopped() {
- return stopped.get();
- }
-
- @Override
- public void start() {
- startQuery();
- super.start();
- }
-
- @Override
- public void stop() {
-
- if(stopped.getAndSet(true)) {
- return;
- }
-
- LOG.info("Stopping QueryMasterTask:" + queryId);
-
- try {
- resourceAllocator.stop();
- } catch (Throwable t) {
- LOG.fatal(t.getMessage(), t);
- }
-
- RpcConnectionPool connPool = RpcConnectionPool.getPool(queryMasterContext.getConf());
- NettyClientBase tmClient = null;
- try {
- // In TajoMaster HA mode, if backup master be active status,
- // worker may fail to connect existing active master. Thus,
- // if worker can't connect the master, worker should try to connect another master and
- // update master address in worker context.
- if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- try {
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
- } catch (Exception e) {
- queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
- HAServiceUtil.getResourceTrackerAddress(systemConf));
- queryMasterContext.getWorkerContext().setTajoMasterAddress(
- HAServiceUtil.getMasterUmbilicalAddress(systemConf));
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
- }
- } else {
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
- }
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- } finally {
- connPool.releaseConnection(tmClient);
- }
-
- super.stop();
-
- //TODO change report to tajo master
- if (queryMetrics != null) {
- queryMetrics.report(new MetricsConsoleReporter());
- }
-
- LOG.info("Stopped QueryMasterTask:" + queryId);
- }
-
- public void handleTaskRequestEvent(TaskRequestEvent event) {
- ExecutionBlockId id = event.getExecutionBlockId();
- query.getStage(id).handleTaskRequestEvent(event);
- }
-
- public void handleTaskFailed(TajoWorkerProtocol.TaskFatalErrorReport report) {
- synchronized(diagnostics) {
- if (diagnostics.size() < 10) {
- diagnostics.add(report);
- }
- }
-
- getEventHandler().handle(new TaskFatalErrorEvent(report));
- }
-
- public Collection<TajoWorkerProtocol.TaskFatalErrorReport> getDiagnostics() {
- synchronized(diagnostics) {
- return Collections.unmodifiableCollection(diagnostics);
- }
- }
-
- private class StageEventDispatcher implements EventHandler<StageEvent> {
- public void handle(StageEvent event) {
- ExecutionBlockId id = event.getStageId();
- if(LOG.isDebugEnabled()) {
- LOG.debug("StageEventDispatcher:" + id + "," + event.getType());
- }
- query.getStage(id).handle(event);
- }
- }
-
- private class TaskEventDispatcher
- implements EventHandler<TaskEvent> {
- public void handle(TaskEvent event) {
- TaskId taskId = event.getTaskId();
- if(LOG.isDebugEnabled()) {
- LOG.debug("TaskEventDispatcher>" + taskId + "," + event.getType());
- }
- Task task = query.getStage(taskId.getExecutionBlockId()).
- getTask(taskId);
- task.handle(event);
- }
- }
-
- private class TaskAttemptEventDispatcher
- implements EventHandler<TaskAttemptEvent> {
- public void handle(TaskAttemptEvent event) {
- TaskAttemptId attemptId = event.getTaskAttemptId();
- Stage stage = query.getStage(attemptId.getTaskId().getExecutionBlockId());
- Task task = stage.getTask(attemptId.getTaskId());
- TaskAttempt attempt = task.getAttempt(attemptId);
- attempt.handle(event);
- }
- }
-
- private class TaskSchedulerDispatcher
- implements EventHandler<TaskSchedulerEvent> {
- public void handle(TaskSchedulerEvent event) {
- Stage stage = query.getStage(event.getExecutionBlockId());
- stage.getTaskScheduler().handle(event);
- }
- }
-
- private class LocalTaskEventHandler implements EventHandler<LocalTaskEvent> {
- @Override
- public void handle(LocalTaskEvent event) {
- TajoContainerProxy proxy = (TajoContainerProxy) resourceAllocator.getContainers().get(event.getContainerId());
- if (proxy != null) {
- proxy.killTaskAttempt(event.getTaskAttemptId());
- }
- }
- }
-
- private class QueryFinishEventHandler implements EventHandler<QueryMasterQueryCompletedEvent> {
- @Override
- public void handle(QueryMasterQueryCompletedEvent event) {
- QueryId queryId = event.getQueryId();
- LOG.info("Query completion notified from " + queryId);
-
- while (!isTerminatedState(query.getSynchronizedState())) {
- try {
- synchronized (this) {
- wait(10);
- }
- } catch (InterruptedException e) {
- LOG.error(e);
- }
- }
- LOG.info("Query final state: " + query.getSynchronizedState());
-
- queryMasterContext.getEventHandler().handle(new QueryStopEvent(queryId));
- }
- }
-
- private static boolean isTerminatedState(QueryState state) {
- return
- state == QueryState.QUERY_SUCCEEDED ||
- state == QueryState.QUERY_FAILED ||
- state == QueryState.QUERY_KILLED ||
- state == QueryState.QUERY_ERROR;
- }
-
- public synchronized void startQuery() {
- StorageManager sm = null;
- LogicalPlan plan = null;
- try {
- if (query != null) {
- LOG.warn("Query already started");
- return;
- }
- CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog();
- LogicalPlanner planner = new LogicalPlanner(catalog);
- LogicalOptimizer optimizer = new LogicalOptimizer(systemConf);
- Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class);
- jsonExpr = null; // remove the possible OOM
- plan = planner.createPlan(queryContext, expr);
-
- StoreType storeType = PlannerUtil.getStoreType(plan);
- if (storeType != null) {
- sm = StorageManager.getStorageManager(systemConf, storeType);
- StorageProperty storageProperty = sm.getStorageProperty();
- if (storageProperty.isSortedInsert()) {
- String tableName = PlannerUtil.getStoreTableName(plan);
- LogicalRootNode rootNode = plan.getRootBlock().getRoot();
- TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild());
- if (tableDesc == null) {
- throw new VerifyException("Can't get table meta data from catalog: " + tableName);
- }
- List<LogicalPlanRewriteRule> storageSpecifiedRewriteRules = sm.getRewriteRules(
- getQueryTaskContext().getQueryContext(), tableDesc);
- if (storageSpecifiedRewriteRules != null) {
- for (LogicalPlanRewriteRule eachRule: storageSpecifiedRewriteRules) {
- optimizer.addRuleAfterToJoinOpt(eachRule);
- }
- }
- }
- }
-
- optimizer.optimize(queryContext, plan);
-
- for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
- LogicalNode[] scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.SCAN);
- if (scanNodes != null) {
- for (LogicalNode eachScanNode : scanNodes) {
- ScanNode scanNode = (ScanNode) eachScanNode;
- tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
- }
- }
-
- scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.PARTITIONS_SCAN);
- if (scanNodes != null) {
- for (LogicalNode eachScanNode : scanNodes) {
- ScanNode scanNode = (ScanNode) eachScanNode;
- tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
- }
- }
- }
- MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
- queryMasterContext.getGlobalPlanner().build(masterPlan);
-
- query = new Query(queryTaskContext, queryId, querySubmitTime,
- "", queryTaskContext.getEventHandler(), masterPlan);
-
- dispatcher.register(QueryEventType.class, query);
- queryTaskContext.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.START));
- } catch (Throwable t) {
- LOG.error(t.getMessage(), t);
- initError = t;
-
- if (plan != null && sm != null) {
- LogicalRootNode rootNode = plan.getRootBlock().getRoot();
- try {
- sm.rollbackOutputCommit(rootNode.getChild());
- } catch (IOException e) {
- LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e);
- }
- }
- }
- }
-
- private void initStagingDir() throws IOException {
- Path stagingDir = null;
- FileSystem defaultFS = TajoConf.getWarehouseDir(systemConf).getFileSystem(systemConf);
-
- try {
-
- stagingDir = initStagingDir(systemConf, queryId.toString(), queryContext);
-
- // Create a subdirectories
- LOG.info("The staging dir '" + stagingDir + "' is created.");
- queryContext.setStagingDir(stagingDir);
- } catch (IOException ioe) {
- if (stagingDir != null && defaultFS.exists(stagingDir)) {
- try {
- defaultFS.delete(stagingDir, true);
- LOG.info("The staging directory '" + stagingDir + "' is deleted");
- } catch (Exception e) {
- LOG.warn(e.getMessage());
- }
- }
-
- throw ioe;
- }
- }
-
- /**
- * It initializes the final output and staging directory and sets
- * them to variables.
- */
- public static Path initStagingDir(TajoConf conf, String queryId, QueryContext context) throws IOException {
-
- String realUser;
- String currentUser;
- UserGroupInformation ugi;
- ugi = UserGroupInformation.getLoginUser();
- realUser = ugi.getShortUserName();
- currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
-
- FileSystem fs;
- Path stagingDir;
-
- ////////////////////////////////////////////
- // Create Output Directory
- ////////////////////////////////////////////
-
- String outputPath = context.get(QueryVars.OUTPUT_TABLE_PATH, "");
- if (context.isCreateTable() || context.isInsert()) {
- if (outputPath == null || outputPath.isEmpty()) {
- // hbase
- stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId);
- } else {
- stagingDir = StorageUtil.concatPath(context.getOutputPath(), TMP_STAGING_DIR_PREFIX, queryId);
- }
- } else {
- stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId);
- }
-
- // initializ
- fs = stagingDir.getFileSystem(conf);
-
- if (fs.exists(stagingDir)) {
- throw new IOException("The staging directory '" + stagingDir + "' already exists");
- }
- fs.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
- FileStatus fsStatus = fs.getFileStatus(stagingDir);
- String owner = fsStatus.getOwner();
-
- if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser))) {
- throw new IOException("The ownership on the user's query " +
- "directory " + stagingDir + " is not as expected. " +
- "It is owned by " + owner + ". The directory must " +
- "be owned by the submitter " + currentUser + " or " +
- "by " + realUser);
- }
-
- if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) {
- LOG.info("Permissions on staging directory " + stagingDir + " are " +
- "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
- "to correct value " + STAGING_DIR_PERMISSION);
- fs.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
- }
-
- Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
- fs.mkdirs(stagingResultDir);
-
- return stagingDir;
- }
-
- public Query getQuery() {
- return query;
- }
-
- protected void expireQuerySession() {
- if(!isTerminatedState(query.getState()) && !(query.getState() == QueryState.QUERY_KILL_WAIT)){
- query.handle(new QueryEvent(queryId, QueryEventType.KILL));
- }
- }
-
- public QueryMasterTaskContext getQueryTaskContext() {
- return queryTaskContext;
- }
-
- public EventHandler getEventHandler() {
- return queryTaskContext.getEventHandler();
- }
-
- public void touchSessionTime() {
- this.lastClientHeartbeat.set(System.currentTimeMillis());
- }
-
- public long getLastClientHeartbeat() {
- return this.lastClientHeartbeat.get();
- }
-
- public QueryId getQueryId() {
- return queryId;
- }
-
- public boolean isInitError() {
- return initError != null;
- }
-
- public QueryState getState() {
- if(query == null) {
- if (isInitError()) {
- return QueryState.QUERY_ERROR;
- } else {
- return QueryState.QUERY_NOT_ASSIGNED;
- }
- } else {
- return query.getState();
- }
- }
-
- public Throwable getInitError() {
- return initError;
- }
-
- public String getErrorMessage() {
- if (isInitError()) {
- return StringUtils.stringifyException(initError);
- } else {
- return null;
- }
- }
-
- public long getQuerySubmitTime() {
- return this.querySubmitTime;
- }
-
- public class QueryMasterTaskContext {
- EventHandler eventHandler;
- public QueryMaster.QueryMasterContext getQueryMasterContext() {
- return queryMasterContext;
- }
-
- public Session getSession() {
- return session;
- }
-
- public QueryContext getQueryContext() {
- return queryContext;
- }
-
- public TajoConf getConf() {
- return systemConf;
- }
-
- public Clock getClock() {
- return queryMasterContext.getClock();
- }
-
- public Query getQuery() {
- return query;
- }
-
- public QueryId getQueryId() {
- return queryId;
- }
-
- public Path getStagingDir() {
- return queryContext.getStagingDir();
- }
-
- public synchronized EventHandler getEventHandler() {
- if(eventHandler == null) {
- eventHandler = dispatcher.getEventHandler();
- }
- return eventHandler;
- }
-
- public AsyncDispatcher getDispatcher() {
- return dispatcher;
- }
-
- public Stage getStage(ExecutionBlockId id) {
- return query.getStage(id);
- }
-
- public Map<String, TableDesc> getTableDescMap() {
- return tableDescMap;
- }
-
- public float getProgress() {
- if(query == null) {
- return 0.0f;
- }
- return query.getProgress();
- }
-
- public AbstractResourceAllocator getResourceAllocator() {
- return resourceAllocator;
- }
-
- public TajoMetrics getQueryMetrics() {
- return queryMetrics;
- }
- }
-}
\ No newline at end of file