You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/08/26 14:29:14 UTC
[5/8] TAJO-127: Implement Tajo Resource Manager. (hyoungjunkim via
hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
new file mode 100644
index 0000000..b2c129f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class QueryJobEvent extends AbstractEvent<QueryJobEvent.Type> {
+ private QueryInfo queryInfo;
+
+ public QueryJobEvent(Type type, QueryInfo queryInfo) {
+ super(type);
+
+ this.queryInfo = queryInfo;
+ }
+
+ public QueryInfo getQueryInfo() {
+ return this.queryInfo;
+ }
+
+ public enum Type {
+ QUERY_JOB_START,
+ QUERY_JOB_HEARTBEAT,
+ QUERY_JOB_FINISH,
+ QUERY_MASTER_START,
+ QUERY_MASTER_STOP
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
new file mode 100644
index 0000000..e4d83af
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.engine.planner.logical.LogicalRootNode;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.rm.WorkerResource;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class QueryJobManager extends CompositeService {
+ private static final Log LOG = LogFactory.getLog(QueryJobManager.class.getName());
+
+ // TajoMaster Context
+ private final TajoMaster.MasterContext masterContext;
+
+ private AsyncDispatcher dispatcher;
+
+ private Map<QueryId, QueryInProgress> runningQueries = new HashMap<QueryId, QueryInProgress>();
+
+ private Map<QueryId, QueryInProgress> finishedQueries = new HashMap<QueryId, QueryInProgress>();
+
+ public QueryJobManager(final TajoMaster.MasterContext masterContext) {
+ super(QueryJobManager.class.getName());
+ this.masterContext = masterContext;
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ try {
+ this.dispatcher = new AsyncDispatcher();
+ addService(this.dispatcher);
+
+ this.dispatcher.register(QueryJobEvent.Type.class, new QueryJobManagerEventHandler());
+ } catch (Exception e) {
+ catchException(null, e);
+ }
+
+ super.init(conf);
+ }
+
+ @Override
+ public void stop() {
+ synchronized(runningQueries) {
+ for(QueryInProgress eachQueryInProgress: runningQueries.values()) {
+ eachQueryInProgress.stop();
+ }
+ }
+ super.stop();
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ }
+
+ public EventHandler getEventHandler() {
+ return dispatcher.getEventHandler();
+ }
+
+ public QueryInfo createNewQueryJob(String sql, LogicalRootNode plan) throws Exception {
+ QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
+ QueryInProgress queryInProgress = new QueryInProgress(masterContext, queryId, sql, plan);
+
+ synchronized(runningQueries) {
+ runningQueries.put(queryId, queryInProgress);
+ }
+
+ addService(queryInProgress);
+ queryInProgress.init(getConfig());
+ queryInProgress.start();
+
+ queryInProgress.startQueryMaster();
+
+ return queryInProgress.getQueryInfo();
+ }
+
+ class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> {
+ @Override
+ public void handle(QueryJobEvent event) {
+ QueryInProgress queryInProgress = null;
+ synchronized(runningQueries) {
+ queryInProgress = runningQueries.get(event.getQueryInfo().getQueryId());
+ if(queryInProgress == null) {
+ LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]");
+ return;
+ }
+ }
+ queryInProgress.getEventHandler().handle(event);
+ }
+ }
+
+ public QueryInProgress getQueryInProgress(QueryId queryId) {
+ synchronized(runningQueries) {
+ return runningQueries.get(queryId);
+ }
+ }
+
+ public void stopQuery(QueryId queryId) {
+ LOG.info("====>Stop QueryInProgress:" + queryId);
+ QueryInProgress queryInProgress = getQueryInProgress(queryId);
+ if(queryInProgress != null) {
+ queryInProgress.stop();
+ synchronized(runningQueries) {
+ runningQueries.remove(queryId);
+ finishedQueries.put(queryId, queryInProgress);
+ }
+ } else {
+ LOG.warn("====> No QueryInProgress while query stopping: " + queryId);
+ }
+ }
+
+ private void catchException(QueryId queryId, Exception e) {
+ LOG.error(e.getMessage(), e);
+ QueryInProgress queryInProgress = runningQueries.get(queryId);
+ queryInProgress.catchException(e);
+ }
+
+ public synchronized TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand queryHeartbeat(
+ TajoMasterProtocol.TajoHeartbeat queryHeartbeat) {
+ QueryInProgress queryInProgress = getQueryInProgress(new QueryId(queryHeartbeat.getQueryId()));
+ if(queryInProgress == null) {
+ return null;
+ }
+
+ QueryInfo queryInfo = makeQueryInfoFromHeartbeat(queryHeartbeat);
+ getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_HEARTBEAT, queryInfo));
+
+ return null;
+ }
+
+ private QueryInfo makeQueryInfoFromHeartbeat(TajoMasterProtocol.TajoHeartbeat queryHeartbeat) {
+ QueryInfo queryInfo = new QueryInfo(new QueryId(queryHeartbeat.getQueryId()));
+ if(queryHeartbeat.getTajoWorkerHost() != null) {
+ WorkerResource queryMasterResource = new WorkerResource();
+ queryMasterResource.setAllocatedHost(queryHeartbeat.getTajoWorkerHost());
+ queryMasterResource.setPorts(new int[]{queryHeartbeat.getTajoWorkerPort(), queryHeartbeat.getTajoWorkerClientPort()});
+
+ queryInfo.setQueryMasterResource(queryMasterResource);
+ }
+ queryInfo.setLastMessage(queryHeartbeat.getStatusMessage());
+ queryInfo.setQueryState(queryHeartbeat.getState());
+
+ return queryInfo;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index b66ef68..50ec5be 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -18,142 +18,82 @@
package org.apache.tajo.master.querymaster;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.SystemClock;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
-import org.apache.hadoop.yarn.client.YarnClient;
-import org.apache.hadoop.yarn.client.YarnClientImpl;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.proto.YarnProtos;
-import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
-import org.apache.tajo.*;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.json.CoreGsonHelper;
-import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.QueryConf;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
import org.apache.tajo.engine.planner.global.GlobalOptimizer;
-import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.planner.logical.LogicalNode;
-import org.apache.tajo.engine.planner.logical.LogicalRootNode;
-import org.apache.tajo.engine.planner.logical.NodeType;
-import org.apache.tajo.engine.planner.logical.ScanNode;
-import org.apache.tajo.ipc.QueryMasterManagerProtocol;
-import org.apache.tajo.ipc.QueryMasterProtocol;
-import org.apache.tajo.master.*;
-import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.rm.RMContainerAllocator;
-import org.apache.tajo.rpc.ProtoAsyncRpcServer;
-import org.apache.tajo.rpc.ProtoBlockingRpcClient;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.GlobalPlanner;
+import org.apache.tajo.master.TajoAsyncDispatcher;
+import org.apache.tajo.master.event.QueryStartEvent;
+import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.util.TajoIdUtils;
+import org.apache.tajo.worker.TajoWorker;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicBoolean;
-// TODO - when exception, send error status to QueryMasterManager
+// TODO - when exception, send error status to QueryJobManager
public class QueryMaster extends CompositeService implements EventHandler {
private static final Log LOG = LogFactory.getLog(QueryMaster.class.getName());
- private static PrimitiveProtos.BoolProto TRUE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
- private static PrimitiveProtos.BoolProto FALSE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
+
private static int QUERY_SESSION_TIMEOUT = 60 * 1000; //60 sec
- // AppMaster Common
- private final long appSubmitTime;
private Clock clock;
- // For Query
- private final QueryId queryId;
- private QueryContext queryContext;
- private Query query;
- private TajoProtos.QueryState state = TajoProtos.QueryState.QUERY_NOT_ASSIGNED;
- private String statusMessage;
- private MasterPlan masterPlan;
-
- private AsyncDispatcher dispatcher;
- private RMContainerAllocator rmAllocator;
+ private TajoAsyncDispatcher dispatcher;
- //service handler for QueryMasterManager, Worker
- private QueryMasterService queryMasterService;
- private QueryMasterClientService queryMasterClientService;
-
- private TaskRunnerLauncher taskRunnerLauncher;
private GlobalPlanner globalPlanner;
+
private GlobalOptimizer globalOptimizer;
- private boolean isCreateTableStmt;
+// private boolean isCreateTableStmt;
private StorageManager storageManager;
- private Path outputPath;
+
private QueryConf queryConf;
- private ApplicationAttemptId appAttemptId;
- private ApplicationId appId;
- private ProtoBlockingRpcClient queryMasterManagerClient;
- private QueryMasterManagerProtocol.QueryMasterManagerProtocolService.BlockingInterface queryMasterManagerService;
- private Map<String, TableDesc> tableDescMap = new HashMap<String, TableDesc>();
+ private Map<QueryId, QueryMasterTask> queryMasterTasks = new HashMap<QueryId, QueryMasterTask>();
- private String queryMasterManagerAddress;
+ private ClientSessionTimeoutCheckThread clientSessionTimeoutCheckThread;
- private YarnRPC yarnRPC;
+ private AtomicBoolean queryMasterStop = new AtomicBoolean(false);
- private YarnClient yarnClient;
+ private QueryMasterContext queryMasterContext;
- private ClientSessionTimeoutCheckThread clientSessionTimeoutCheckThread;
+ private QueryHeartbeatThread queryHeartbeatThread;
- public QueryMaster(final QueryId queryId, final long appSubmitTime, String queryMasterManagerAddress) {
- super(QueryMaster.class.getName());
+ private TajoWorker.WorkerContext workerContext;
- this.queryId = queryId;
- this.appSubmitTime = appSubmitTime;
- this.appId = queryId.getApplicationId();
- this.queryMasterManagerAddress = queryMasterManagerAddress;
-
- LOG.info("Created Query Master for " + queryId);
+ public QueryMaster(TajoWorker.WorkerContext workerContext) {
+ super(QueryMaster.class.getName());
+ this.workerContext = workerContext;
}
public void init(Configuration conf) {
+ LOG.info("QueryMaster init");
try {
queryConf = new QueryConf(conf);
- QUERY_SESSION_TIMEOUT = 60 * 1000;//queryConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT);
- queryContext = new QueryContext(queryConf);
- yarnRPC = YarnRPC.create(queryContext.getConf());
- connectYarnClient();
+ queryConf.addResource(new Path(QueryConf.QUERY_MASTER_FILENAME));
- LOG.info("Init QueryMasterManagerClient connection to:" + queryMasterManagerAddress);
- InetSocketAddress addr = NetUtils.createSocketAddr(queryMasterManagerAddress);
- queryMasterManagerClient = new ProtoBlockingRpcClient(QueryMasterManagerProtocol.class, addr);
- queryMasterManagerService = queryMasterManagerClient.getStub();
+ QUERY_SESSION_TIMEOUT = 60 * 1000;//queryConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT);
+ queryMasterContext = new QueryMasterContext(queryConf);
clock = new SystemClock();
- this.dispatcher = new AsyncDispatcher();
+ this.dispatcher = new TajoAsyncDispatcher("querymaster_" + System.currentTimeMillis());
addIfService(dispatcher);
this.storageManager = new StorageManager(queryConf);
@@ -161,23 +101,8 @@ public class QueryMaster extends CompositeService implements EventHandler {
globalPlanner = new GlobalPlanner(queryConf, storageManager, dispatcher.getEventHandler());
globalOptimizer = new GlobalOptimizer();
- queryMasterService = new QueryMasterService();
- addIfService(queryMasterService);
-
- queryMasterClientService = new QueryMasterClientService(queryContext);
- addIfService(queryMasterClientService);
-
- initStagingDir();
-
- dispatcher.register(SubQueryEventType.class, new SubQueryEventDispatcher());
- dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
- dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
- dispatcher.register(QueryFinishEvent.EventType.class, new QueryFinishEventHandler());
- dispatcher.register(TaskSchedulerEvent.EventType.class, new TaskSchedulerDispatcher());
+ dispatcher.register(QueryStartEvent.EventType.class, new QueryStartEventHandler());
- clientSessionTimeoutCheckThread = new ClientSessionTimeoutCheckThread();
-
- clientSessionTimeoutCheckThread.start();
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
throw new RuntimeException(t);
@@ -185,364 +110,88 @@ public class QueryMaster extends CompositeService implements EventHandler {
super.init(conf);
}
- class ClientSessionTimeoutCheckThread extends Thread {
- public void run() {
- LOG.info("ClientSessionTimeoutCheckThread started");
- while(true) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- break;
- }
- try {
- long lastHeartbeat = queryContext.getLastClientHeartbeat();
- long time = System.currentTimeMillis() - lastHeartbeat;
- if(lastHeartbeat > 0 && time > QUERY_SESSION_TIMEOUT) {
- LOG.warn("Query " + queryId + " stopped cause query sesstion timeout: " + time + " ms");
- QueryMaster.this.stop();
- }
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- }
- }
- }
-
- class QueryHeartbeatThread extends Thread {
- public QueryHeartbeatThread() {
- super("QueryHeartbeatThread");
- }
-
- @Override
- public void run() {
- LOG.info("Start QueryMaster heartbeat thread");
- while(queryMasterManagerClient.isConnected()) {
- QueryMasterManagerProtocol.QueryHeartbeat queryHeartbeat =
- QueryMasterManagerProtocol.QueryHeartbeat.newBuilder()
- .setQueryMasterHost(queryMasterService.bindAddr.getHostName())
- .setQueryMasterPort(queryMasterService.bindAddr.getPort())
- .setQueryMasterClientPort(queryMasterClientService.getBindAddr().getPort())
- .setState(state)
- .setQueryId(queryId.getProto())
- .build();
-
- try {
- QueryMasterManagerProtocol.QueryHeartbeatResponse response =
- queryMasterManagerService.queryHeartbeat(null, queryHeartbeat);
- if(response.getResponseCommand() != null) {
- if("executeQuery".equals(response.getResponseCommand().getCommand())) {
- appAttemptId = TajoIdUtils.toApplicationAttemptId(response.getResponseCommand().getParams(0));
- startQuery(response.getResponseCommand().getParams(1),
- response.getResponseCommand().getParams(2));
- }
- }
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- break;
- }
- }
- LOG.info("QueryMaster heartbeat thread stopped");
- }
- }
-
- // TODO blocking/nonblocking ???
- class QueryMasterService extends AbstractService implements QueryMasterProtocol.QueryMasterProtocolService.Interface {
- private ProtoAsyncRpcServer rpcServer;
- private InetSocketAddress bindAddr;
- private String addr;
- private QueryHeartbeatThread queryHeartbeatThread;
-
- public QueryMasterService() {
- super(QueryMasterService.class.getName());
-
- // Setup RPC server
- try {
- InetSocketAddress initIsa =
- new InetSocketAddress(InetAddress.getLocalHost(), 0);
- if (initIsa.getAddress() == null) {
- throw new IllegalArgumentException("Failed resolve of " + initIsa);
- }
-
- this.rpcServer = new ProtoAsyncRpcServer(QueryMasterProtocol.class, this, initIsa);
- this.rpcServer.start();
-
- this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
- this.addr = NetUtils.normalizeInetSocketAddress(this.bindAddr);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- queryConf.setVar(TajoConf.ConfVars.TASKRUNNER_LISTENER_ADDRESS, addr);
- LOG.info("QueryMasterService startup");
- }
-
- @Override
- public void init(Configuration conf) {
- super.init(conf);
- }
-
- @Override
- public void start() {
- try {
- queryHeartbeatThread = new QueryHeartbeatThread();
- queryHeartbeatThread.start();
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- // TODO - set query status failed and stop QueryMaster
- }
- super.start();
- }
-
- @Override
- public void stop() {
- if(rpcServer != null) {
- rpcServer.shutdown();
- }
- if(queryHeartbeatThread != null) {
- queryHeartbeatThread.interrupt();
- }
- if(yarnClient != null) {
- yarnClient.stop();
- }
- if(clientSessionTimeoutCheckThread != null) {
- clientSessionTimeoutCheckThread.interrupt();
- }
- super.stop();
- LOG.info("QueryMasterService stopped");
- }
-
- @Override
- public void getTask(RpcController controller, YarnProtos.ContainerIdProto request,
- RpcCallback<QueryMasterProtocol.QueryUnitRequestProto> done) {
- queryContext.getEventHandler().handle(new TaskRequestEvent(new ContainerIdPBImpl(request), done));
- }
-
- @Override
- public void statusUpdate(RpcController controller, QueryMasterProtocol.TaskStatusProto request,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
- QueryUnitAttemptId attemptId = new QueryUnitAttemptId(request.getId());
- queryContext.getEventHandler().handle(new TaskAttemptStatusUpdateEvent(attemptId, request));
- done.run(TRUE_PROTO);
- }
-
- @Override
- public void ping(RpcController controller,
- TajoIdProtos.QueryUnitAttemptIdProto attemptIdProto,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
- // TODO - to be completed
- QueryUnitAttemptId attemptId = new QueryUnitAttemptId(attemptIdProto);
- done.run(TRUE_PROTO);
- }
-
- @Override
- public void fatalError(RpcController controller, QueryMasterProtocol.TaskFatalErrorReport report,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
- queryContext.getEventHandler().handle(new TaskFatalErrorEvent(report));
- done.run(TRUE_PROTO);
- }
+ @Override
+ public void start() {
+ LOG.info("====>QueryMaster start");
- @Override
- public void done(RpcController controller, QueryMasterProtocol.TaskCompletionReport report,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
- queryContext.getEventHandler().handle(new TaskCompletionEvent(report));
- done.run(TRUE_PROTO);
- }
+ queryHeartbeatThread = new QueryHeartbeatThread();
+ queryHeartbeatThread.start();
- @Override
- public void executeQuery(RpcController controller, PrimitiveProtos.StringProto request,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
- }
- }
+ clientSessionTimeoutCheckThread = new ClientSessionTimeoutCheckThread();
+ clientSessionTimeoutCheckThread.start();
- public void start() {
super.start();
}
+ @Override
public void stop() {
- LOG.info("unregisterApplicationMaster");
- if(rmAllocator != null) {
- try {
- FinalApplicationStatus status = FinalApplicationStatus.UNDEFINED;
- if (query != null) {
- TajoProtos.QueryState state = query.getState();
- if (state == TajoProtos.QueryState.QUERY_SUCCEEDED) {
- status = FinalApplicationStatus.SUCCEEDED;
- } else if (state == TajoProtos.QueryState.QUERY_FAILED || state == TajoProtos.QueryState.QUERY_ERROR) {
- status = FinalApplicationStatus.FAILED;
- } else if (state == TajoProtos.QueryState.QUERY_ERROR) {
- status = FinalApplicationStatus.FAILED;
- }
- }
- this.rmAllocator.unregisterApplicationMaster(status, "tajo query finished", null);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
+ synchronized(queryMasterStop) {
+ if(queryMasterStop.get()) {
+ return;
}
- }
- // TODO - release opened resource
- if(this.queryMasterManagerClient != null) {
- reportQueryStatus();
-
- queryMasterManagerClient.close();
+ queryMasterStop.set(true);
+ queryMasterStop.notifyAll();
}
- try {
- FileSystem.closeAll();
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
+ if(queryHeartbeatThread != null) {
+ queryHeartbeatThread.interrupt();
}
+ if(clientSessionTimeoutCheckThread != null) {
+ clientSessionTimeoutCheckThread.interrupt();
+ }
super.stop();
- synchronized(queryId) {
- queryId.notifyAll();
+ LOG.info("QueryMaster stop");
+ if(!queryMasterContext.getWorkerContext().isStandbyMode()) {
+ queryMasterContext.getWorkerContext().stopWorker(true);
}
}
- private void reportQueryStatus() {
- //send query status heartbeat
- QueryMasterManagerProtocol.QueryHeartbeat.Builder queryHeartbeatBuilder =
- QueryMasterManagerProtocol.QueryHeartbeat.newBuilder()
- .setQueryMasterHost(queryMasterService.bindAddr.getHostName())
- .setQueryMasterPort(queryMasterService.bindAddr.getPort())
- .setQueryMasterClientPort(queryMasterClientService.getBindAddr().getPort())
- .setState(state)
- .setQueryId(queryId.getProto());
-
- if(statusMessage != null) {
- queryHeartbeatBuilder.setStatusMessage(statusMessage);
- }
+ public void reportQueryStatusToQueryMaster(QueryId queryId, TajoProtos.QueryState state) {
+ LOG.info("Send QueryMaster Ready to QueryJobManager:" + queryId);
try {
- queryMasterManagerService.queryHeartbeat(null, queryHeartbeatBuilder.build());
+ TajoMasterProtocol.TajoHeartbeat.Builder queryHeartbeatBuilder = TajoMasterProtocol.TajoHeartbeat.newBuilder()
+ .setTajoWorkerHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName())
+ .setTajoWorkerPort(workerContext.getTajoWorkerManagerService().getBindAddr().getPort())
+ .setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
+ .setState(state)
+ .setQueryId(queryId.getProto());
+
+ workerContext.getTajoMasterRpcClient().heartbeat(null, queryHeartbeatBuilder.build(), NullCallback.get());
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}
- private void connectYarnClient() {
- this.yarnClient = new YarnClientImpl();
- this.yarnClient.init(queryConf);
- this.yarnClient.start();
- }
-
protected void addIfService(Object object) {
if (object instanceof Service) {
addService((Service) object);
}
}
- public synchronized void startQuery(String queryStr, String planJSON) {
- LOG.info("Query Start:" + queryStr);
- LOG.info("Plan JSON:" + planJSON);
- if(query != null) {
- LOG.warn("Query already started");
- return;
- }
-
- try {
- LogicalRootNode logicalNodeRoot = (LogicalRootNode) CoreGsonHelper.fromJson(planJSON, LogicalNode.class);
- LogicalNode[] scanNodes = PlannerUtil.findAllNodes(logicalNodeRoot, NodeType.SCAN);
- if(scanNodes != null) {
- for(LogicalNode eachScanNode: scanNodes) {
- ScanNode scanNode = (ScanNode)eachScanNode;
- tableDescMap.put(scanNode.getFromTable().getTableName(), scanNode.getFromTable().getTableDesc());
- }
- }
- MasterPlan globalPlan = globalPlanner.build(queryId, logicalNodeRoot);
- this.masterPlan = globalOptimizer.optimize(globalPlan);
-
- taskRunnerLauncher = new TaskRunnerLauncherImpl(queryContext);
- addIfService(taskRunnerLauncher);
- dispatcher.register(TaskRunnerGroupEvent.EventType.class, taskRunnerLauncher);
-
- ((TaskRunnerLauncherImpl)taskRunnerLauncher).init(queryConf);
- ((TaskRunnerLauncherImpl)taskRunnerLauncher).start();
-
- rmAllocator = new RMContainerAllocator(queryContext);
- addIfService(rmAllocator);
- dispatcher.register(ContainerAllocatorEventType.class, rmAllocator);
-
- rmAllocator.init(queryConf);
- rmAllocator.start();
-
- //TODO - synchronized with executeQuery logic
- query = new Query(queryContext, queryId, clock, appSubmitTime,
- "", dispatcher.getEventHandler(), masterPlan, storageManager);
- dispatcher.register(QueryEventType.class, query);
-
- dispatcher.getEventHandler().handle(new QueryEvent(queryId,
- QueryEventType.INIT));
- dispatcher.getEventHandler().handle(new QueryEvent(queryId,
- QueryEventType.START));
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- //send FAIL query status
- this.statusMessage = StringUtils.stringifyException(e);
- this.state = TajoProtos.QueryState.QUERY_FAILED;
- }
- }
-
@Override
public void handle(Event event) {
dispatcher.getEventHandler().handle(event);
}
- public EventHandler getEventHandler() {
- return dispatcher.getEventHandler();
- }
-
- private class SubQueryEventDispatcher implements EventHandler<SubQueryEvent> {
- public void handle(SubQueryEvent event) {
- SubQueryId id = event.getSubQueryId();
- query.getSubQuery(id).handle(event);
- }
+ public Query getQuery(QueryId queryId) {
+ return queryMasterTasks.get(queryId).getQuery();
}
- private class TaskEventDispatcher
- implements EventHandler<TaskEvent> {
- public void handle(TaskEvent event) {
- QueryUnitId taskId = event.getTaskId();
- QueryUnit task = query.getSubQuery(taskId.getSubQueryId()).
- getQueryUnit(taskId);
- task.handle(event);
- }
- }
-
- private class TaskAttemptEventDispatcher
- implements EventHandler<TaskAttemptEvent> {
- public void handle(TaskAttemptEvent event) {
- QueryUnitAttemptId attemptId = event.getTaskAttemptId();
- SubQuery subQuery = query.getSubQuery(attemptId.getSubQueryId());
- QueryUnit task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
- QueryUnitAttempt attempt = task.getAttempt(attemptId);
- attempt.handle(event);
- }
- }
-
- private class TaskSchedulerDispatcher
- implements EventHandler<TaskSchedulerEvent> {
- public void handle(TaskSchedulerEvent event) {
- SubQuery subQuery = query.getSubQuery(event.getSubQueryId());
- subQuery.getTaskScheduler().handle(event);
- }
+ public QueryMasterTask getQueryMasterTask(QueryId queryId) {
+ return queryMasterTasks.get(queryId);
}
- public QueryContext getContext() {
- return this.queryContext;
+ public QueryMasterContext getContext() {
+ return this.queryMasterContext;
}
- public class QueryContext {
+ public class QueryMasterContext {
private QueryConf conf;
- public Map<ContainerId, ContainerProxy> containers = new ConcurrentHashMap<ContainerId, ContainerProxy>();
- int minCapability;
- int maxCapability;
- int numCluster;
- AtomicLong lastClientHeartbeat = new AtomicLong(-1);
- public QueryContext(QueryConf conf) {
+ public QueryMasterContext(QueryConf conf) {
this.conf = conf;
}
@@ -550,15 +199,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
return conf;
}
- public InetSocketAddress getQueryMasterServiceAddress() {
- return queryMasterService.bindAddr;
- }
-
- public QueryMasterClientService getQueryMasterClientService() {
- return queryMasterClientService;
- }
-
- public AsyncDispatcher getDispatcher() {
+ public TajoAsyncDispatcher getDispatcher() {
return dispatcher;
}
@@ -566,251 +207,133 @@ public class QueryMaster extends CompositeService implements EventHandler {
return clock;
}
- public Query getQuery() {
- return query;
+ public StorageManager getStorageManager() {
+ return storageManager;
}
- public SubQuery getSubQuery(SubQueryId subQueryId) {
- return query.getSubQuery(subQueryId);
+ public QueryMaster getQueryMaster() {
+ return QueryMaster.this;
}
- public QueryId getQueryId() {
- return queryId;
+ public GlobalPlanner getGlobalPlanner() {
+ return globalPlanner;
}
-
- public ApplicationId getApplicationId() {
- return appId;
+ public GlobalOptimizer getGlobalOptimizer() {
+ return globalOptimizer;
}
- public ApplicationAttemptId getApplicationAttemptId() {
- return appAttemptId;
+ public TajoWorker.WorkerContext getWorkerContext() {
+ return workerContext;
}
public EventHandler getEventHandler() {
return dispatcher.getEventHandler();
}
- public void addContainer(ContainerId cId, ContainerProxy container) {
- containers.put(cId, container);
- }
-
- public void removeContainer(ContainerId cId) {
- containers.remove(cId);
- }
-
- public boolean containsContainer(ContainerId cId) {
- return containers.containsKey(cId);
- }
-
- public ContainerProxy getContainer(ContainerId cId) {
- return containers.get(cId);
- }
-
- public Map<ContainerId, ContainerProxy> getContainers() {
- return containers;
- }
-
- public int getNumClusterNode() {
- return numCluster;
- }
-
- public void setNumClusterNodes(int num) {
- numCluster = num;
- }
-
-// public CatalogService getCatalog() {
-// return catalog;
-// }
-
- public Map<String, TableDesc> getTableDescMap() {
- return tableDescMap;
- }
-
- public Path getOutputPath() {
- return outputPath;
- }
-
- public void setMaxContainerCapability(int capability) {
- this.maxCapability = capability;
- }
-
- public int getMaxContainerCapability() {
- return this.maxCapability;
- }
-
- public void setMinContainerCapability(int capability) {
- this.minCapability = capability;
- }
-
- public int getMinContainerCapability() {
- return this.minCapability;
- }
-
- public boolean isCreateTableQuery() {
- return isCreateTableStmt;
- }
-
- public float getProgress() {
- if(query != null) {
- return query.getProgress();
- } else {
- return 0;
+ public void stopQuery(QueryId queryId) {
+ QueryMasterTask queryMasterTask;
+ synchronized(queryMasterTasks) {
+ queryMasterTask = queryMasterTasks.remove(queryId);
}
- }
-
- public long getStartTime() {
- if(query != null) {
- return query.getStartTime();
+ if(queryMasterTask != null) {
+ try {
+ queryMasterTask.stop();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
} else {
- return -1;
+ LOG.warn("No query info:" + queryId);
}
- }
-
- public long getFinishTime() {
- if(query != null) {
- return query.getFinishTime();
- } else {
- return -1;
+ if(!workerContext.isStandbyMode()) {
+ stop();
}
}
-
- public StorageManager getStorageManager() {
- return storageManager;
- }
-
- public QueryMaster getQueryMaster() {
- return QueryMaster.this;
- }
-
- public YarnRPC getYarnRPC() {
- return yarnRPC;
- }
-
- public void setState(TajoProtos.QueryState state) {
- QueryMaster.this.state = state;
- }
-
- public TajoProtos.QueryState getState() {
- return state;
- }
-
- public void touchSessionTime() {
- this.lastClientHeartbeat.set(System.currentTimeMillis());
- }
-
- public long getLastClientHeartbeat() {
- return this.lastClientHeartbeat.get();
- }
}
- private class QueryFinishEventHandler implements EventHandler<QueryFinishEvent> {
+ private class QueryStartEventHandler implements EventHandler<QueryStartEvent> {
@Override
- public void handle(QueryFinishEvent event) {
- LOG.info("Query end notification started for QueryId : " + query.getId() + "," + query.getState());
-
- //QueryMaster must be lived until client fetching all query result data.
- try {
- // Stop all services
- // This will also send the final report to the ResourceManager
- //LOG.info("Calling stop for all the services");
-// stop();
- } catch (Throwable t) {
- LOG.warn("Graceful stop failed ", t);
+ public void handle(QueryStartEvent event) {
+ LOG.info("====>Start QueryStartEventHandler:" + event.getQueryId());
+ //To change body of implemented methods use File | Settings | File Templates.
+ QueryMasterTask queryMasterTask = new QueryMasterTask(queryMasterContext,
+ event.getQueryId(), event.getLogicalPlanJson());
+
+ queryMasterTask.init(queryConf);
+ queryMasterTask.start();
+ synchronized(queryMasterTasks) {
+ queryMasterTasks.put(event.getQueryId(), queryMasterTask);
}
-
- //Bring the process down by force.
- //Not needed after HADOOP-7140
- //LOG.info("Exiting QueryMaster..GoodBye!");
}
}
- // query submission directory is private!
- final public static FsPermission USER_DIR_PERMISSION =
- FsPermission.createImmutable((short) 0700); // rwx--------
-
- /**
- * It initializes the final output and staging directory and sets
- * them to variables.
- */
- private void initStagingDir() throws IOException {
- QueryConf conf = getContext().getConf();
-
- String realUser;
- String currentUser;
- UserGroupInformation ugi;
- ugi = UserGroupInformation.getLoginUser();
- realUser = ugi.getShortUserName();
- currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
-
- String givenOutputTableName = conf.getOutputTable();
- Path stagingDir;
-
- // If final output directory is not given by an user,
- // we use the query id as a output directory.
- if (givenOutputTableName.equals("")) {
- this.isCreateTableStmt = false;
- FileSystem defaultFS = FileSystem.get(conf);
-
- Path homeDirectory = defaultFS.getHomeDirectory();
- if (!defaultFS.exists(homeDirectory)) {
- defaultFS.mkdirs(homeDirectory, new FsPermission(USER_DIR_PERMISSION));
- }
-
- Path userQueryDir = new Path(homeDirectory, TajoConstants.USER_QUERYDIR_PREFIX);
-
- if (defaultFS.exists(userQueryDir)) {
- FileStatus fsStatus = defaultFS.getFileStatus(userQueryDir);
- String owner = fsStatus.getOwner();
+ class QueryHeartbeatThread extends Thread {
+ public QueryHeartbeatThread() {
+ super("QueryHeartbeatThread");
+ }
- if (!(owner.equals(currentUser) || owner.equals(realUser))) {
- throw new IOException("The ownership on the user's query " +
- "directory " + userQueryDir + " is not as expected. " +
- "It is owned by " + owner + ". The directory must " +
- "be owned by the submitter " + currentUser + " or " +
- "by " + realUser);
+ @Override
+ public void run() {
+ LOG.info("Start QueryMaster heartbeat thread");
+ while(!queryMasterStop.get()) {
+ //TODO report all query status
+ List<QueryMasterTask> tempTasks = new ArrayList<QueryMasterTask>();
+ synchronized(queryMasterTasks) {
+ tempTasks.addAll(queryMasterTasks.values());
}
+ synchronized(queryMasterTasks) {
+ for(QueryMasterTask eachTask: tempTasks) {
+ TajoMasterProtocol.TajoHeartbeat queryHeartbeat = TajoMasterProtocol.TajoHeartbeat.newBuilder()
+ .setTajoWorkerHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName())
+ .setTajoWorkerPort(workerContext.getTajoWorkerManagerService().getBindAddr().getPort())
+ .setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
+ .setState(eachTask.getState())
+ .setQueryId(eachTask.getQueryId().getProto())
+ .build();
- if (!fsStatus.getPermission().equals(USER_DIR_PERMISSION)) {
- LOG.info("Permissions on staging directory " + userQueryDir + " are " +
- "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
- "to correct value " + USER_DIR_PERMISSION);
- defaultFS.setPermission(userQueryDir, new FsPermission(USER_DIR_PERMISSION));
+ workerContext.getTajoMasterRpcClient().heartbeat(null, queryHeartbeat, NullCallback.get());
+ }
+ }
+ synchronized(queryMasterStop) {
+ try {
+ queryMasterStop.wait(2000);
+ } catch (InterruptedException e) {
+ break;
+ }
}
- } else {
- defaultFS.mkdirs(userQueryDir,
- new FsPermission(USER_DIR_PERMISSION));
- }
-
- stagingDir = StorageUtil.concatPath(userQueryDir, queryId.toString());
-
- if (defaultFS.exists(stagingDir)) {
- throw new IOException("The staging directory " + stagingDir
- + "already exists. The directory must be unique to each query");
- } else {
- defaultFS.mkdirs(stagingDir, new FsPermission(USER_DIR_PERMISSION));
}
+ LOG.info("QueryMaster heartbeat thread stopped");
+ }
+ }
- // Set the query id to the output table name
- conf.setOutputTable(queryId.toString());
- } else {
- this.isCreateTableStmt = true;
- Path warehouseDir = new Path(conf.getVar(TajoConf.ConfVars.ROOT_DIR),
- TajoConstants.WAREHOUSE_DIR);
- stagingDir = new Path(warehouseDir, conf.getOutputTable());
+ class ClientSessionTimeoutCheckThread extends Thread {
+ public void run() {
+ LOG.info("ClientSessionTimeoutCheckThread started");
+ while(true) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ break;
+ }
+ List<QueryMasterTask> tempTasks = new ArrayList<QueryMasterTask>();
+ synchronized(queryMasterTasks) {
+ tempTasks.addAll(queryMasterTasks.values());
+ }
- FileSystem fs = warehouseDir.getFileSystem(conf);
- if (fs.exists(stagingDir)) {
- throw new IOException("The staging directory " + stagingDir
- + " already exists. The directory must be unique to each query");
- } else {
- // TODO - should have appropriate permission
- fs.mkdirs(stagingDir, new FsPermission(USER_DIR_PERMISSION));
+ for(QueryMasterTask eachTask: tempTasks) {
+ try {
+ long lastHeartbeat = eachTask.getLastClientHeartbeat();
+ long time = System.currentTimeMillis() - lastHeartbeat;
+ if(lastHeartbeat > 0 && time > QUERY_SESSION_TIMEOUT) {
+ LOG.warn("Query " + eachTask.getQueryId() + " stopped cause query sesstion timeout: " + time + " ms");
+ eachTask.expiredSessionTimeout();
+ }
+ } catch (Exception e) {
+ LOG.error(eachTask.getQueryId() + ":" + e.getMessage(), e);
+ }
+ }
}
}
-
- conf.setOutputPath(stagingDir);
- outputPath = stagingDir;
- LOG.info("Initialized Query Staging Dir: " + outputPath);
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterClientService.java
deleted file mode 100644
index 74298e5..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterClientService.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.proto.YarnProtos;
-import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.ipc.ClientProtos;
-import org.apache.tajo.ipc.QueryMasterClientProtocol;
-import org.apache.tajo.rpc.ProtoBlockingRpcServer;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.util.TajoIdUtils;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
-public class QueryMasterClientService extends AbstractService {
- private static final Log LOG = LogFactory.getLog(QueryMasterClientService.class);
- private final PrimitiveProtos.BoolProto BOOL_TRUE =
- PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
-
- private ProtoBlockingRpcServer rpcServer;
- private InetSocketAddress bindAddr;
- private String addr;
- private QueryMaster.QueryContext queryContext;
- private QueryMasterClientProtocolServiceHandler serviceHandler;
-
- public QueryMasterClientService(QueryMaster.QueryContext queryContext) {
- super(QueryMasterClientService.class.getName());
-
- this.queryContext = queryContext;
- this.serviceHandler = new QueryMasterClientProtocolServiceHandler();
-
- // init RPC Server in constructor cause Heartbeat Thread use bindAddr
- // Setup RPC server
- try {
- // TODO initial port num is value of config and find unused port with sequence
- InetSocketAddress initIsa = new InetSocketAddress(InetAddress.getLocalHost(), 0);
- if (initIsa.getAddress() == null) {
- throw new IllegalArgumentException("Failed resolve of " + initIsa);
- }
-
- // TODO blocking/non-blocking??
- this.rpcServer = new ProtoBlockingRpcServer(QueryMasterClientProtocol.class, serviceHandler, initIsa);
- this.rpcServer.start();
-
- this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
- this.addr = NetUtils.normalizeInetSocketAddress(bindAddr);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- // Get the master address
- LOG.info(QueryMasterClientService.class.getSimpleName() + " (" + queryContext.getQueryId() + ") listens on "
- + addr);
- }
-
- @Override
- public void init(Configuration conf) {
- super.init(conf);
- }
-
- @Override
- public void start() {
- super.start();
- }
-
- @Override
- public void stop() {
- if(rpcServer != null) {
- rpcServer.shutdown();
- }
- LOG.info("QueryMasterClientService stopped");
- super.stop();
- }
-
- public InetSocketAddress getBindAddr() {
- return bindAddr;
- }
-
-
- public class QueryMasterClientProtocolServiceHandler
- implements QueryMasterClientProtocol.QueryMasterClientProtocolService.BlockingInterface {
- @Override
- public PrimitiveProtos.BoolProto updateSessionVariables(
- RpcController controller,
- ClientProtos.UpdateSessionVariableRequest request) throws ServiceException {
- return null;
- }
-
- @Override
- public ClientProtos.GetQueryResultResponse getQueryResult(
- RpcController controller,
- ClientProtos.GetQueryResultRequest request) throws ServiceException {
- QueryId queryId = new QueryId(request.getQueryId());
- Query query = queryContext.getQuery();
-
- ClientProtos.GetQueryResultResponse.Builder builder = ClientProtos.GetQueryResultResponse.newBuilder();
-
- if(query == null) {
- builder.setErrorMessage("No Query for " + queryId);
- } else {
- switch (query.getState()) {
- case QUERY_SUCCEEDED:
- builder.setTableDesc((CatalogProtos.TableDescProto)query.getResultDesc().getProto());
- break;
- case QUERY_FAILED:
- case QUERY_ERROR:
- builder.setErrorMessage("Query " + queryId + " is failed");
- default:
- builder.setErrorMessage("Query " + queryId + " is still running");
- }
- }
- return builder.build();
- }
-
- @Override
- public ClientProtos.GetQueryStatusResponse getQueryStatus(
- RpcController controller,
- ClientProtos.GetQueryStatusRequest request) throws ServiceException {
- ClientProtos.GetQueryStatusResponse.Builder builder
- = ClientProtos.GetQueryStatusResponse.newBuilder();
- QueryId queryId = new QueryId(request.getQueryId());
- builder.setQueryId(request.getQueryId());
-
- if (queryId.equals(TajoIdUtils.NullQueryId)) {
- builder.setResultCode(ClientProtos.ResultCode.OK);
- builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
- } else {
- Query query = queryContext.getQuery();
- builder.setResultCode(ClientProtos.ResultCode.OK);
- builder.setQueryMasterHost(queryContext.getQueryMasterClientService().getBindAddr().getHostName());
- builder.setQueryMasterPort(queryContext.getQueryMasterClientService().getBindAddr().getPort());
-
-
- queryContext.touchSessionTime();
- if (query != null) {
- builder.setState(query.getState());
- builder.setProgress(query.getProgress());
- builder.setSubmitTime(query.getAppSubmitTime());
- builder.setInitTime(query.getInitializationTime());
- builder.setHasResult(!query.isCreateTableStmt());
- if (query.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
- builder.setFinishTime(query.getFinishTime());
- } else {
- builder.setFinishTime(System.currentTimeMillis());
- }
- } else {
- builder.setState(queryContext.getState());
- }
- }
-
- return builder.build();
- }
-
- @Override
- public PrimitiveProtos.BoolProto killQuery(
- RpcController controller,
- YarnProtos.ApplicationAttemptIdProto request) throws ServiceException {
- LOG.info("Stop QueryMaster:" + queryContext.getQueryId());
- Thread t = new Thread() {
- public void run() {
- try {
- Thread.sleep(1000); //wait tile return to rpc response
- } catch (InterruptedException e) {
- }
- queryContext.getQueryMaster().stop();
- }
- };
- t.start();
- return BOOL_TRUE;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManager.java
deleted file mode 100644
index 35d7201..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManager.java
+++ /dev/null
@@ -1,353 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.Clock;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.client.YarnClient;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.service.CompositeService;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.engine.planner.logical.LogicalRootNode;
-import org.apache.tajo.ipc.QueryMasterManagerProtocol;
-import org.apache.tajo.ipc.QueryMasterManagerProtocol.QueryHeartbeatResponse;
-import org.apache.tajo.master.ContainerProxy;
-import org.apache.tajo.master.TajoMaster;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-// TODO - check QueryMaster status and if QueryMaster failed, release resource
-public class QueryMasterManager extends CompositeService {
- private static final Log LOG = LogFactory.getLog(QueryMasterManager.class.getName());
-
- // Master Context
- private final TajoMaster.MasterContext masterContext;
-
- // AppMaster Common
- private final Clock clock;
- private final long appSubmitTime;
- private final ApplicationId appId;
- private ApplicationAttemptId appAttemptId;
-
- protected YarnClient yarnClient;
-
- // For Query
- private final QueryId queryId;
-
- private AsyncDispatcher dispatcher;
- private YarnRPC rpc;
-
- private TajoProtos.QueryState state;
- private float progress;
- private long finishTime;
- private TableDesc resultDesc;
- private String queryMasterHost;
- private int queryMasterPort;
- private int queryMasterClientPort;
-
- private LogicalRootNode plan;
-
- private AtomicBoolean querySubmitted = new AtomicBoolean(false);
-
- private AtomicBoolean queryMasterStopped = new AtomicBoolean(true);
-
- private boolean stopCheckThreadStarted = false;
-
- private String query;
-
- public QueryMasterManager(final TajoMaster.MasterContext masterContext,
- final YarnClient yarnClient,
- final QueryId queryId,
- final String query,
- final LogicalRootNode plan,
- final ApplicationId appId,
- final Clock clock, long appSubmitTime) {
- super(QueryMasterManager.class.getName());
- this.masterContext = masterContext;
- this.yarnClient = yarnClient;
-
- this.appId = appId;
- this.clock = clock;
- this.appSubmitTime = appSubmitTime;
- this.queryId = queryId;
- this.plan = plan;
- this.query = query;
- LOG.info("Created Query Master Manager for AppId=" + appId + ", QueryID=" + queryId);
- }
-
- @Override
- public void init(Configuration conf) {
- super.init(conf);
-
- state = TajoProtos.QueryState.QUERY_MASTER_INIT;
- }
-
- public TajoProtos.QueryState getState() {
- return state;
- }
-
- @Override
- public void start() {
- try {
- appAttemptId = allocateAndLaunchQueryMaster();
- } catch (YarnRemoteException e) {
- LOG.error(e.getMessage(), e);
- }
- super.start();
- }
-
- @Override
- public void stop() {
- while(true) {
- if(queryMasterStopped.get()) {
- break;
- }
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- }
- }
- LOG.info("QueryMasterManager for " + queryId + " stopped");
- super.stop();
- }
-
- public float getProgress() {
- return progress;
- }
-
- public long getAppSubmitTime() {
- return appSubmitTime;
- }
-
- public long getFinishTime() {
- return finishTime;
- }
-
- public TableDesc getResultDesc() {
- return resultDesc;
- }
-
- public String getQueryMasterHost() {
- return queryMasterHost;
- }
-
- public int getQueryMasterPort() {
- return queryMasterPort;
- }
-
- public int getQueryMasterClientPort() {
- return queryMasterClientPort;
- }
-
- public synchronized QueryHeartbeatResponse.ResponseCommand queryHeartbeat(QueryMasterManagerProtocol.QueryHeartbeat queryHeartbeat) {
- this.queryMasterHost = queryHeartbeat.getQueryMasterHost();
- this.queryMasterPort = queryHeartbeat.getQueryMasterPort();
- this.queryMasterClientPort = queryHeartbeat.getQueryMasterClientPort();
- this.state = queryHeartbeat.getState();
- if(state == TajoProtos.QueryState.QUERY_FAILED) {
- //TODO needed QueryMaster's detail status(failed before or after launching worker)
- queryMasterStopped.set(true);
- if(queryHeartbeat.getStatusMessage() != null) {
- LOG.warn(queryId + " failed, " + queryHeartbeat.getStatusMessage());
- }
- }
-
- if(!stopCheckThreadStarted && !queryMasterStopped.get() && isFinishState(this.state)) {
- stopCheckThreadStarted = true;
- startCheckingQueryMasterStop();
- }
- if(appAttemptId != null && !querySubmitted.get()) {
- LOG.info("submitQuery to QueryMaster(" + queryMasterHost + ":" + queryMasterPort + ")");
- queryMasterStopped.set(false);
- querySubmitted.set(true);
- List<String> params = new ArrayList<String>(3);
- params.add(appAttemptId.toString());
- params.add(query);
- params.add(plan.toJson());
- return QueryHeartbeatResponse.ResponseCommand.newBuilder()
- .setCommand("executeQuery")
- .addAllParams(params)
- .build();
- } else {
- return null;
- }
- }
-
- private boolean isFinishState(TajoProtos.QueryState state) {
- return state == TajoProtos.QueryState.QUERY_FAILED ||
- state == TajoProtos.QueryState.QUERY_KILLED ||
- state == TajoProtos.QueryState.QUERY_SUCCEEDED;
- }
-
- private void startCheckingQueryMasterStop() {
- Thread t = new Thread() {
- public void run() {
- try {
- ApplicationReport report = monitorApplication(appId,
- EnumSet.of(
- YarnApplicationState.FINISHED,
- YarnApplicationState.KILLED,
- YarnApplicationState.FAILED));
- queryMasterStopped.set(true);
- LOG.info("QueryMaster (" + queryId + ") stopped");
- } catch (YarnRemoteException e) {
- LOG.error(e.getMessage(), e);
- }
- }
- };
-
- t.start();
- }
-
- private ApplicationAttemptId allocateAndLaunchQueryMaster() throws YarnRemoteException {
- LOG.info("Allocate and launch QueryMaster:" + yarnClient);
- ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
-
- // set the application id
- appContext.setApplicationId(appId);
- // set the application name
- appContext.setApplicationName("Tajo");
-
- Priority pri = Records.newRecord(Priority.class);
- pri.setPriority(5);
- appContext.setPriority(pri);
-
- // Set the queue to which this application is to be submitted in the RM
- appContext.setQueue("default");
-
- ContainerLaunchContext commonContainerLaunchContext =
- ContainerProxy.createCommonContainerLaunchContext(masterContext.getConf(), queryId.toString(), true);
-
- // Setup environment by cloning from common env.
- Map<String, String> env = commonContainerLaunchContext.getEnvironment();
- Map<String, String> myEnv = new HashMap<String, String>(env.size());
- myEnv.putAll(env);
-
- ////////////////////////////////////////////////////////////////////////////
- // Set the local resources
- ////////////////////////////////////////////////////////////////////////////
- // Set the necessary command to execute the application master
- Vector<CharSequence> vargs = new Vector<CharSequence>(30);
-
- // Set java executable command
- //LOG.info("Setting up app master command");
- vargs.add("${JAVA_HOME}" + "/bin/java");
- // Set Xmx based on am memory size
- vargs.add("-Xmx2000m");
- // Set Remote Debugging
- //if (!context.getQuery().getSubQuery(event.getSubQueryId()).isLeafQuery()) {
- //vargs.add("-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
- //}
- // Set class name
- vargs.add(QueryMasterRunner.class.getCanonicalName());
- vargs.add(queryId.toString()); // queryId
- vargs.add(String.valueOf(appSubmitTime));
- vargs.add(masterContext.getQueryMasterManagerService().getBindAddress().getHostName() + ":" +
- masterContext.getQueryMasterManagerService().getBindAddress().getPort());
-
- vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
- vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
-
- // Get final commmand
- StringBuilder command = new StringBuilder();
- for (CharSequence str : vargs) {
- command.append(str).append(" ");
- }
-
- LOG.info("Completed setting up QueryMasterRunner command " + command.toString());
- List<String> commands = new ArrayList<String>();
- commands.add(command.toString());
-
- final Resource resource = Records.newRecord(Resource.class);
- // TODO - get default value from conf
- resource.setMemory(2000);
- resource.setVirtualCores(1);
-
- Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
-
- ContainerLaunchContext masterContainerContext = BuilderUtils.newContainerLaunchContext(
- null, commonContainerLaunchContext.getUser(),
- resource, commonContainerLaunchContext.getLocalResources(), myEnv, commands,
- myServiceData, null, new HashMap<ApplicationAccessType, String>(2));
-
- appContext.setAMContainerSpec(masterContainerContext);
-
- LOG.info("Submitting QueryMaster to ResourceManager");
- yarnClient.submitApplication(appContext);
-
- ApplicationReport appReport = monitorApplication(appId, EnumSet.of(YarnApplicationState.ACCEPTED));
- ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId();
-
- LOG.info("Launching QueryMaster with id: " + attemptId);
-
- state = TajoProtos.QueryState.QUERY_MASTER_LAUNCHED;
-
- return attemptId;
- }
-
- private ApplicationReport monitorApplication(ApplicationId appId,
- Set<YarnApplicationState> finalState) throws YarnRemoteException {
-
- long sleepTime = 100;
- int count = 1;
- while (true) {
- // Get application report for the appId we are interested in
- ApplicationReport report = yarnClient.getApplicationReport(appId);
-
- LOG.info("Got application report from ASM for" + ", appId="
- + appId.getId() + ", appAttemptId="
- + report.getCurrentApplicationAttemptId() + ", clientToken="
- + report.getClientToken() + ", appDiagnostics="
- + report.getDiagnostics() + ", appMasterHost=" + report.getHost()
- + ", appQueue=" + report.getQueue() + ", appMasterRpcPort="
- + report.getRpcPort() + ", appStartTime=" + report.getStartTime()
- + ", yarnAppState=" + report.getYarnApplicationState().toString()
- + ", distributedFinalState="
- + report.getFinalApplicationStatus().toString() + ", appTrackingUrl="
- + report.getTrackingUrl() + ", appUser=" + report.getUser());
-
- YarnApplicationState state = report.getYarnApplicationState();
- if (finalState.contains(state)) {
- return report;
- }
- try {
- Thread.sleep(sleepTime);
- sleepTime = count * 100;
- if(count < 10) {
- count++;
- }
- } catch (InterruptedException e) {
- //LOG.debug("Thread sleep in monitoring loop interrupted");
- }
-
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
deleted file mode 100644
index 65f237c..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.QueryMasterManagerProtocol;
-import org.apache.tajo.ipc.QueryMasterManagerProtocol.QueryHeartbeat;
-import org.apache.tajo.ipc.QueryMasterManagerProtocol.QueryHeartbeatResponse;
-import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.rpc.ProtoBlockingRpcServer;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
-import org.apache.tajo.util.NetUtils;
-
-import java.net.InetSocketAddress;
-
-import static org.apache.tajo.conf.TajoConf.ConfVars;
-
-public class QueryMasterManagerService extends AbstractService {
- private final static Log LOG = LogFactory.getLog(QueryMasterManagerService.class);
-
- private final TajoMaster.MasterContext context;
- private final TajoConf conf;
- private final QueryMasterManagerProtocolServiceHandler masterHandler;
- private ProtoBlockingRpcServer server;
- private InetSocketAddress bindAddress;
-
- private final BoolProto BOOL_TRUE = BoolProto.newBuilder().setValue(true).build();
- private final BoolProto BOOL_FALSE = BoolProto.newBuilder().setValue(false).build();
-
- public QueryMasterManagerService(TajoMaster.MasterContext context) {
- super(QueryMasterManagerService.class.getName());
- this.context = context;
- this.conf = context.getConf();
- this.masterHandler = new QueryMasterManagerProtocolServiceHandler();
- }
-
- @Override
- public void start() {
- // TODO resolve hostname
- String confMasterServiceAddr = conf.getVar(ConfVars.QUERY_MASTER_MANAGER_SERVICE_ADDRESS);
- InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
- try {
- server = new ProtoBlockingRpcServer(QueryMasterManagerProtocol.class, masterHandler, initIsa);
- } catch (Exception e) {
- LOG.error(e);
- }
- server.start();
- bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
- this.conf.setVar(ConfVars.QUERY_MASTER_MANAGER_SERVICE_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddress));
- LOG.info("QueryMasterManagerService startup");
- super.start();
- }
-
- @Override
- public void stop() {
- if(server != null) {
- server.shutdown();
- server = null;
- }
- LOG.info("QueryMasterManagerService shutdown");
- super.stop();
- }
-
- public InetSocketAddress getBindAddress() {
- return bindAddress;
- }
-
- public class QueryMasterManagerProtocolServiceHandler implements QueryMasterManagerProtocol.QueryMasterManagerProtocolService.BlockingInterface {
- @Override
- public QueryHeartbeatResponse queryHeartbeat(RpcController controller, QueryHeartbeat request) throws ServiceException {
- // TODO - separate QueryMasterManagerProtocol, ClientServiceProtocol
- QueryId queryId = new QueryId(request.getQueryId());
- if(LOG.isDebugEnabled()) {
- LOG.debug("Received QueryHeartbeat:" + queryId + "," + request);
- }
- QueryMasterManager queryMasterManager = context.getQuery(queryId);
- if (queryMasterManager == null) {
- LOG.warn("No query:" + queryId);
- return QueryHeartbeatResponse.newBuilder().setHeartbeatResult(BOOL_FALSE).build();
- }
-
- QueryHeartbeatResponse.ResponseCommand command = queryMasterManager.queryHeartbeat(request);
-
- //ApplicationAttemptId attemptId = queryMasterManager.getAppAttemptId();
- //String attemptIdStr = attemptId == null ? null : attemptId.toString();
- QueryHeartbeatResponse.Builder builder = QueryHeartbeatResponse.newBuilder();
- builder.setHeartbeatResult(BOOL_TRUE);
- if(command != null) {
- builder.setResponseCommand(command);
- }
- return builder.build();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
index daab9fd..5a79464 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
@@ -39,13 +39,11 @@ public class QueryMasterRunner extends AbstractService {
private QueryConf queryConf;
private QueryMaster queryMaster;
private QueryId queryId;
- private long appSubmitTime;
private String queryMasterManagerAddress;
- public QueryMasterRunner(QueryId queryId, long appSubmitTime, String queryMasterManagerAddress) {
+ public QueryMasterRunner(QueryId queryId, String queryMasterManagerAddress) {
super(QueryMasterRunner.class.getName());
this.queryId = queryId;
- this.appSubmitTime = appSubmitTime;
this.queryMasterManagerAddress = queryMasterManagerAddress;
}
@@ -72,7 +70,7 @@ public class QueryMasterRunner extends AbstractService {
@Override
public void start() {
//create QueryMaster
- QueryMaster query = new QueryMaster(queryId, appSubmitTime, queryMasterManagerAddress);
+ QueryMaster query = new QueryMaster(null);
query.init(queryConf);
query.start();
@@ -90,13 +88,12 @@ public class QueryMasterRunner extends AbstractService {
UserGroupInformation.setConfiguration(conf);
- final QueryId queryId = TajoIdUtils.createQueryId(args[0]);
- final long appSubmitTime = Long.parseLong(args[1]);
- final String queryMasterManagerAddr = args[2];
+ final QueryId queryId = TajoIdUtils.parseQueryId(args[0]);
+ final String queryMasterManagerAddr = args[1];
LOG.info("Received QueryId:" + queryId);
- QueryMasterRunner queryMasterRunner = new QueryMasterRunner(queryId, appSubmitTime, queryMasterManagerAddr);
+ QueryMasterRunner queryMasterRunner = new QueryMasterRunner(queryId, queryMasterManagerAddr);
queryMasterRunner.init(conf);
queryMasterRunner.start();