You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/01/09 15:34:18 UTC
[2/8] tajo git commit: TAJO-1282: Cleanup the relationship of
QueryInProgress and QueryJobManager.
TAJO-1282: Cleanup the relationship of QueryInProgress and QueryJobManager.
Closes #334
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/307c6c9f
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/307c6c9f
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/307c6c9f
Branch: refs/heads/index_support
Commit: 307c6c9f558ae96ffa2f7351ffa3dc5788ba4dbb
Parents: 809cba3
Author: Hyunsik Choi <hy...@apache.org>
Authored: Fri Jan 9 14:47:47 2015 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Fri Jan 9 14:47:47 2015 +0900
----------------------------------------------------------------------
CHANGES | 3 +
.../org/apache/tajo/master/QueryJobManager.java | 59 +++++++------
.../apache/tajo/master/TajoMasterService.java | 7 --
.../master/rm/TajoWorkerResourceManager.java | 2 +-
.../tajo/master/rm/WorkerResourceManager.java | 2 +-
.../master/scheduler/SimpleFifoScheduler.java | 3 +-
.../tajo/querymaster/QueryInProgress.java | 87 ++------------------
.../apache/tajo/querymaster/QueryJobEvent.java | 5 +-
.../apache/tajo/querymaster/QueryMaster.java | 2 +-
.../tajo/querymaster/QueryMasterTask.java | 47 +----------
.../src/main/proto/TajoMasterProtocol.proto | 1 -
.../apache/tajo/querymaster/TestKillQuery.java | 2 +-
12 files changed, 53 insertions(+), 167 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 369dbda..35670d7 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,9 @@ Release 0.9.1 - unreleased
IMPROVEMENT
+ TAJO-1282: Cleanup the relationship of QueryInProgress and
+ QueryJobManager. (hyunsik)
+
TAJO-1258: Close() for classes derived from FileAppender should be robust.
(Jongyoung Park via jinho)
http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
index c9b8711..6a8da27 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
@@ -28,22 +28,25 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.TajoProtos;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.scheduler.SimpleFifoScheduler;
+import org.apache.tajo.plan.logical.LogicalRootNode;
import org.apache.tajo.querymaster.QueryInProgress;
import org.apache.tajo.querymaster.QueryJobEvent;
import org.apache.tajo.session.Session;
-import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.master.scheduler.SimpleFifoScheduler;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
+/**
+ * QueryJobManager manages all scheduled and running queries.
+ * It receives all Query related events and routes them to each QueryInProgress.
+ */
public class QueryJobManager extends CompositeService {
private static final Log LOG = LogFactory.getLog(QueryJobManager.class.getName());
@@ -69,7 +72,7 @@ public class QueryJobManager extends CompositeService {
}
@Override
- public void init(Configuration conf) {
+ public void serviceInit(Configuration conf) throws Exception {
try {
this.dispatcher = new AsyncDispatcher();
addService(this.dispatcher);
@@ -81,24 +84,24 @@ public class QueryJobManager extends CompositeService {
catchException(null, e);
}
- super.init(conf);
+ super.serviceInit(conf);
}
@Override
- public void stop() {
+ public void serviceStop() throws Exception {
synchronized(runningQueries) {
for(QueryInProgress eachQueryInProgress: runningQueries.values()) {
- eachQueryInProgress.stop();
+ eachQueryInProgress.stopProgress();
}
}
this.scheduler.stop();
- super.stop();
+ super.serviceStop();
}
@Override
- public void start() {
+ public void serviceStart() throws Exception {
this.scheduler.start();
- super.start();
+ super.serviceStart();
}
public EventHandler getEventHandler() {
@@ -164,39 +167,42 @@ public class QueryJobManager extends CompositeService {
runningQueries.put(queryInProgress.getQueryId(), queryInProgress);
}
- addService(queryInProgress);
- queryInProgress.init(getConfig());
- queryInProgress.start();
-
- if (!queryInProgress.startQueryMaster()) {
+ if (queryInProgress.startQueryMaster()) {
+ dispatcher.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START,
+ queryInProgress.getQueryInfo()));
+ } else {
stopQuery(queryId);
}
return queryInProgress.getQueryInfo();
}
- public TajoMaster.MasterContext getMasterContext() {
- return masterContext;
- }
-
class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> {
+
@Override
public void handle(QueryJobEvent event) {
QueryInProgress queryInProgress = getQueryInProgress(event.getQueryInfo().getQueryId());
- if(queryInProgress == null) {
+
+
+ if (queryInProgress == null) {
LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]");
return;
}
- if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) {
+
+ if (event.getType() == QueryJobEvent.Type.QUERY_MASTER_START) {
+ queryInProgress.submmitQueryToMaster();
+
+ } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) {
stopQuery(event.getQueryInfo().getQueryId());
- } else if (queryInProgress.isStarted()) {
- queryInProgress.getEventHandler().handle(event);
+
} else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
scheduler.removeQuery(queryInProgress.getQueryId());
- queryInProgress.getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
-
+ queryInProgress.kill();
stopQuery(queryInProgress.getQueryId());
+
+ } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) {
+ queryInProgress.heartbeat(event.getQueryInfo());
}
}
}
@@ -219,7 +225,7 @@ public class QueryJobManager extends CompositeService {
LOG.info("Stop QueryInProgress:" + queryId);
QueryInProgress queryInProgress = getQueryInProgress(queryId);
if(queryInProgress != null) {
- queryInProgress.stop();
+ queryInProgress.stopProgress();
synchronized(submittedQueries) {
submittedQueries.remove(queryId);
}
@@ -245,7 +251,6 @@ public class QueryJobManager extends CompositeService {
avgExecutionTime.set(executionTime);
}
executedQuerySize.incrementAndGet();
- removeService(queryInProgress);
} else {
LOG.warn("No QueryInProgress while query stopping: " + queryId);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
index a7df206..02bdfa1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -136,13 +136,6 @@ public class TajoMasterService extends AbstractService {
}
@Override
- public void stopQueryMaster(RpcController controller, TajoIdProtos.QueryIdProto request,
- RpcCallback<BoolProto> done) {
- context.getQueryJobManager().stopQuery(new QueryId(request));
- done.run(BOOL_TRUE);
- }
-
- @Override
public void getAllWorkerResource(RpcController controller, PrimitiveProtos.NullProto request,
RpcCallback<TajoMasterProtocol.WorkerResourcesRequest> done) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index c4200d5..9f2a3d5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -526,7 +526,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
}
@Override
- public void stopQueryMaster(QueryId queryId) {
+ public void releaseQueryMaster(QueryId queryId) {
if(!rmContext.getQueryMasterContainer().containsKey(queryId)) {
LOG.warn("No QueryMaster resource info for " + queryId);
return;
http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
index b237cc5..79ec0ac 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
@@ -80,7 +80,7 @@ public interface WorkerResourceManager extends Service {
*
* @param queryId QueryId to be stopped
*/
- public void stopQueryMaster(QueryId queryId);
+ public void releaseQueryMaster(QueryId queryId);
/**
*
http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
index bd8ca28..a091ed5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
@@ -58,7 +58,8 @@ public class SimpleFifoScheduler implements Scheduler {
LOG.info("Size of Fifo queue is " + qSize);
}
- QuerySchedulingInfo querySchedulingInfo = new QuerySchedulingInfo(queryInProgress.getQueryId(), 1, queryInProgress.getStartTime());
+ QuerySchedulingInfo querySchedulingInfo = new QuerySchedulingInfo(queryInProgress.getQueryId(), 1,
+ queryInProgress.getQueryInfo().getStartTime());
boolean result = pool.add(querySchedulingInfo);
if (getRunningQueries().size() == 0) wakeupProcessor();
return result;
http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
index bda2ec1..f83f244 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
@@ -20,11 +20,7 @@ package org.apache.tajo.querymaster;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.engine.query.QueryContext;
@@ -35,12 +31,12 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto;
import org.apache.tajo.master.QueryInfo;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.rm.WorkerResourceManager;
-import org.apache.tajo.session.Session;
import org.apache.tajo.plan.logical.LogicalRootNode;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.session.Session;
import org.apache.tajo.util.NetUtils;
import java.net.InetSocketAddress;
@@ -48,15 +44,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource;
-public class QueryInProgress extends CompositeService {
+public class QueryInProgress {
private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName());
private QueryId queryId;
private Session session;
- private AsyncDispatcher dispatcher;
-
private LogicalRootNode plan;
private AtomicBoolean querySubmitted = new AtomicBoolean(false);
@@ -76,7 +70,7 @@ public class QueryInProgress extends CompositeService {
Session session,
QueryContext queryContext,
QueryId queryId, String sql, String jsonExpr, LogicalRootNode plan) {
- super(QueryInProgress.class.getName());
+
this.masterContext = masterContext;
this.session = session;
this.queryId = queryId;
@@ -86,23 +80,14 @@ public class QueryInProgress extends CompositeService {
queryInfo.setStartTime(System.currentTimeMillis());
}
- @Override
- public void init(Configuration conf) {
- dispatcher = new AsyncDispatcher();
- this.addService(dispatcher);
-
- dispatcher.register(QueryJobEvent.Type.class, new QueryInProgressEventHandler());
- super.init(conf);
- }
-
public synchronized void kill() {
+ getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
if(queryMasterRpcClient != null){
queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
}
}
- @Override
- public void stop() {
+ public void stopProgress() {
if(stopped.getAndSet(true)) {
return;
}
@@ -110,52 +95,15 @@ public class QueryInProgress extends CompositeService {
LOG.info("=========================================================");
LOG.info("Stop query:" + queryId);
- masterContext.getResourceManager().stopQueryMaster(queryId);
-
- long startTime = System.currentTimeMillis();
- while(true) {
- try {
- if(masterContext.getResourceManager().isQueryMasterStopped(queryId)) {
- LOG.info(queryId + " QueryMaster stopped");
- break;
- }
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- break;
- }
-
- try {
- synchronized (this){
- wait(100);
- }
- } catch (InterruptedException e) {
- break;
- }
- if(System.currentTimeMillis() - startTime > 60 * 1000) {
- LOG.warn("Failed to stop QueryMaster:" + queryId);
- break;
- }
- }
+ masterContext.getResourceManager().releaseQueryMaster(queryId);
if(queryMasterRpc != null) {
RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc);
}
masterContext.getHistoryWriter().appendHistory(queryInfo);
- super.stop();
- }
-
- @Override
- public void start() {
- super.start();
}
- public EventHandler getEventHandler() {
- return dispatcher.getEventHandler();
- }
-
-
-
public boolean startQueryMaster() {
try {
LOG.info("Initializing QueryInProgress for QueryID=" + queryId);
@@ -173,8 +121,6 @@ public class QueryInProgress extends CompositeService {
queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort());
queryInfo.setQueryMasterInfoPort(resource.getConnectionInfo().getHttpInfoPort());
- getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInfo));
-
return true;
} catch (Exception e) {
catchException(e);
@@ -182,23 +128,6 @@ public class QueryInProgress extends CompositeService {
}
}
- class QueryInProgressEventHandler implements EventHandler<QueryJobEvent> {
- @Override
- public void handle(QueryJobEvent queryJobEvent) {
- if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) {
- heartbeat(queryJobEvent.getQueryInfo());
- } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_MASTER_START) {
- QueryInProgress queryInProgress = masterContext.getQueryJobManager().getQueryInProgress(queryId);
- queryInProgress.getEventHandler().handle(
- new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, queryInProgress.getQueryInfo()));
- } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_START) {
- submmitQueryToMaster();
- } else if (queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
- kill();
- }
- }
- }
-
private void connectQueryMaster() throws Exception {
InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
LOG.info("Connect to QueryMaster:" + addr);
@@ -207,7 +136,7 @@ public class QueryInProgress extends CompositeService {
queryMasterRpcClient = queryMasterRpc.getStub();
}
- private synchronized void submmitQueryToMaster() {
+ public synchronized void submmitQueryToMaster() {
if(querySubmitted.get()) {
return;
}
@@ -256,7 +185,7 @@ public class QueryInProgress extends CompositeService {
return !stopped.get() && this.querySubmitted.get();
}
- private void heartbeat(QueryInfo queryInfo) {
+ public void heartbeat(QueryInfo queryInfo) {
LOG.info("Received QueryMaster heartbeat:" + queryInfo);
// to avoid partial update by different heartbeats
http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java
index 1a1f2ff..27eb2b6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java
@@ -35,12 +35,9 @@ public class QueryJobEvent extends AbstractEvent<QueryJobEvent.Type> {
}
public enum Type {
- QUERY_JOB_START,
+ QUERY_MASTER_START,
QUERY_JOB_HEARTBEAT,
- QUERY_JOB_FINISH,
QUERY_JOB_STOP,
- QUERY_MASTER_START,
- QUERY_MASTER_STOP,
QUERY_JOB_KILL
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
index 76df397..02760a3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
@@ -473,7 +473,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
public void handle(QueryStartEvent event) {
LOG.info("Start QueryStartEventHandler:" + event.getQueryId());
QueryMasterTask queryMasterTask = new QueryMasterTask(queryMasterContext,
- event.getQueryId(), event.getSession(), event.getQueryContext(), event.getJsonExpr(), event.getLogicalPlanJson());
+ event.getQueryId(), event.getSession(), event.getQueryContext(), event.getJsonExpr());
synchronized(queryMasterTasks) {
queryMasterTasks.put(event.getQueryId(), queryMasterTask);
http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
index bab5903..fd52488 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@ -41,13 +41,10 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.exception.UnimplementedException;
-import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.TajoContainerProxy;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
-import org.apache.tajo.session.Session;
import org.apache.tajo.plan.LogicalOptimizer;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.LogicalPlanner;
@@ -58,8 +55,7 @@ import org.apache.tajo.plan.logical.ScanNode;
import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.plan.verifier.VerifyException;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.session.Session;
import org.apache.tajo.storage.StorageManager;
import org.apache.tajo.storage.StorageProperty;
import org.apache.tajo.storage.StorageUtil;
@@ -96,12 +92,8 @@ public class QueryMasterTask extends CompositeService {
private Query query;
- private MasterPlan masterPlan;
-
private String jsonExpr;
- private String logicalPlanJson;
-
private AsyncDispatcher dispatcher;
private final long querySubmitTime;
@@ -124,8 +116,7 @@ public class QueryMasterTask extends CompositeService {
new ArrayList<TajoWorkerProtocol.TaskFatalErrorReport>();
public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
- QueryId queryId, Session session, QueryContext queryContext, String jsonExpr,
- String logicalPlanJson) {
+ QueryId queryId, Session session, QueryContext queryContext, String jsonExpr) {
super(QueryMasterTask.class.getName());
this.queryMasterContext = queryMasterContext;
@@ -133,7 +124,6 @@ public class QueryMasterTask extends CompositeService {
this.session = session;
this.queryContext = queryContext;
this.jsonExpr = jsonExpr;
- this.logicalPlanJson = logicalPlanJson;
this.querySubmitTime = System.currentTimeMillis();
}
@@ -198,42 +188,11 @@ public class QueryMasterTask extends CompositeService {
LOG.fatal(t.getMessage(), t);
}
- RpcConnectionPool connPool = RpcConnectionPool.getPool(queryMasterContext.getConf());
- NettyClientBase tmClient = null;
- try {
- // In TajoMaster HA mode, if backup master be active status,
- // worker may fail to connect existing active master. Thus,
- // if worker can't connect the master, worker should try to connect another master and
- // update master address in worker context.
- if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- try {
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
- } catch (Exception e) {
- queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
- HAServiceUtil.getResourceTrackerAddress(systemConf));
- queryMasterContext.getWorkerContext().setTajoMasterAddress(
- HAServiceUtil.getMasterUmbilicalAddress(systemConf));
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
- }
- } else {
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
- }
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- } finally {
- connPool.releaseConnection(tmClient);
- }
-
- super.stop();
-
- //TODO change report to tajo master
if (queryMetrics != null) {
queryMetrics.report(new MetricsConsoleReporter());
}
+ super.stop();
LOG.info("Stopped QueryMasterTask:" + queryId);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto
index cc83e47..bc73596 100644
--- a/tajo-core/src/main/proto/TajoMasterProtocol.proto
+++ b/tajo-core/src/main/proto/TajoMasterProtocol.proto
@@ -143,6 +143,5 @@ service TajoMasterProtocolService {
rpc heartbeat(TajoHeartbeat) returns (TajoHeartbeatResponse);
rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse);
rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto);
- rpc stopQueryMaster(QueryIdProto) returns (BoolProto);
rpc getAllWorkerResource(NullProto) returns (WorkerResourcesRequest);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index a125196..bd899cd 100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@ -90,7 +90,7 @@ public class TestKillQuery {
QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster();
QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(),
- queryId, session, defaultContext, expr.toJson(), plan.getRootBlock().getRoot().toJson());
+ queryId, session, defaultContext, expr.toJson());
queryMasterTask.init(conf);
queryMasterTask.getQueryTaskContext().getDispatcher().start();