You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/05/06 11:14:37 UTC
tajo git commit: TAJO-1584: Remove QueryMaster client sharing in
TajoMaster and TajoWorker.
Repository: tajo
Updated Branches:
refs/heads/master b6b9d4631 -> 04167bdc3
TAJO-1584: Remove QueryMaster client sharing in TajoMaster and TajoWorker.
Closes #559
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/04167bdc
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/04167bdc
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/04167bdc
Branch: refs/heads/master
Commit: 04167bdc3bb04b53c5a245a9c18b6426ade82a26
Parents: b6b9d46
Author: Jinho Kim <jh...@apache.org>
Authored: Wed May 6 18:13:40 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Wed May 6 18:13:40 2015 +0900
----------------------------------------------------------------------
CHANGES | 3 +
.../java/org/apache/tajo/conf/TajoConf.java | 2 -
.../org/apache/tajo/master/QueryInProgress.java | 31 ++---
.../querymaster/QueryMasterManagerService.java | 135 ++++++++-----------
.../tajo/worker/ExecutionBlockContext.java | 32 +++--
.../java/org/apache/tajo/worker/TajoWorker.java | 1 +
.../tajo/worker/TajoWorkerManagerService.java | 2 +
.../main/java/org/apache/tajo/worker/Task.java | 4 +-
.../java/org/apache/tajo/worker/TaskRunner.java | 43 +++---
.../src/main/proto/QueryMasterProtocol.proto | 14 +-
.../org/apache/tajo/rpc/NettyClientBase.java | 7 +-
.../org/apache/tajo/rpc/RpcClientManager.java | 9 ++
12 files changed, 133 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index a790655..ebd88cd 100644
--- a/CHANGES
+++ b/CHANGES
@@ -24,6 +24,9 @@ Release 0.11.0 - unreleased
IMPROVEMENT
+ TAJO-1584: Remove QueryMaster client sharing in TajoMaster and TajoWorker.
+ (jinho)
+
TAJO-1563: Improve RPC error handling. (jinho)
TAJO-1311: Enable Scattered Hash Shuffle for CTAS statement. (jaehwa)
http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index bfba290..46e7618 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -224,8 +224,6 @@ public class TajoConf extends Configuration {
HIVEUSEEXPLICITRCFILEHEADER("tajo.exec.rcfile.use.explicit.header", true, Validators.bool()),
// RPC --------------------------------------------------------------------
- RPC_POOL_MAX_IDLE("tajo.rpc.pool.idle.max", 10),
-
// Internal RPC Client
INTERNAL_RPC_CLIENT_WORKER_THREAD_NUM("tajo.internal.rpc.client.worker-thread-num",
Runtime.getRuntime().availableProcessors() * 2),
http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
index d2286cf..6a074a2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.StringUtils;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos;
+import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.QueryCoordinatorProtocol.WorkerAllocatedResource;
import org.apache.tajo.ipc.QueryMasterProtocol;
@@ -31,14 +32,13 @@ import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto;
import org.apache.tajo.master.rm.WorkerResourceManager;
import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcClientManager;
+import org.apache.tajo.rpc.*;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.session.Session;
import org.apache.tajo.util.NetUtils;
import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -92,7 +92,9 @@ public class QueryInProgress {
try {
getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
if (queryMasterRpcClient != null) {
- queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
+ CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>();
+ queryMasterRpcClient.killQuery(callFuture.getController(), queryId.getProto(), callFuture);
+ callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
} catch (Throwable e) {
catchException("Failed to kill query " + queryId + " by exception " + e, e);
@@ -111,9 +113,7 @@ public class QueryInProgress {
masterContext.getResourceManager().releaseQueryMaster(queryId);
- if(queryMasterRpc != null) {
- RpcClientManager.cleanup(queryMasterRpc);
- }
+ RpcClientManager.cleanup(queryMasterRpc);
try {
masterContext.getHistoryWriter().appendAndFlush(queryInfo);
@@ -156,8 +156,9 @@ public class QueryInProgress {
private void connectQueryMaster() throws Exception {
InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
LOG.info("Connect to QueryMaster:" + addr);
- queryMasterRpc =
- RpcClientManager.getInstance().getClient(addr, QueryMasterProtocol.class, true);
+
+ RpcClientManager.cleanup(queryMasterRpc);
+ queryMasterRpc = RpcClientManager.getInstance().newClient(addr, QueryMasterProtocol.class, true);
queryMasterRpcClient = queryMasterRpc.getStub();
}
@@ -177,11 +178,7 @@ public class QueryInProgress {
if(queryMasterRpcClient == null) {
connectQueryMaster();
}
- if(queryMasterRpcClient == null) {
- LOG.info("No QueryMaster connection info.");
- //TODO wait
- return;
- }
+
LOG.info("Call executeQuery to :" +
queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId);
@@ -192,11 +189,15 @@ public class QueryInProgress {
.setExprInJson(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getJsonExpr()))
.setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build());
- queryMasterRpcClient.executeQuery(null, builder.build(), NullCallback.get());
+ CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>();
+ queryMasterRpcClient.executeQuery(callFuture.getController(), builder.build(), callFuture);
+ callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
querySubmitted.set(true);
getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_MASTER_LAUNCHED);
} catch (Exception e) {
LOG.error("Failed to submit query " + queryId + " to master by exception " + e, e);
+ catchException(e.getMessage(), e);
} finally {
writeLock.unlock();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
index 85cc553..59933a7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
@@ -115,10 +115,6 @@ public class QueryMasterManagerService extends CompositeService
return bindAddr;
}
- public String getHostAndPort() {
- return bindAddr.getHostName() + ":" + bindAddr.getPort();
- }
-
@Override
public void getTask(RpcController controller, TajoWorkerProtocol.GetTaskRequestProto request,
RpcCallback<TajoWorkerProtocol.TaskRequestProto> done) {
@@ -136,127 +132,106 @@ public class QueryMasterManagerService extends CompositeService
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
+ controller.setFailed(e.getMessage());
}
}
@Override
public void statusUpdate(RpcController controller, TajoWorkerProtocol.TaskStatusProto request,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
- try {
- QueryId queryId = new QueryId(request.getId().getTaskId().getExecutionBlockId().getQueryId());
- TaskAttemptId attemptId = new TaskAttemptId(request.getId());
- QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
- if (queryMasterTask == null) {
- queryMasterTask = queryMaster.getQueryMasterTask(queryId, true);
- }
- Stage sq = queryMasterTask.getQuery().getStage(attemptId.getTaskId().getExecutionBlockId());
- Task task = sq.getTask(attemptId.getTaskId());
- TaskAttempt attempt = task.getAttempt(attemptId.getId());
+ RpcCallback<PrimitiveProtos.NullProto> done) {
+ QueryId queryId = new QueryId(request.getId().getTaskId().getExecutionBlockId().getQueryId());
+ TaskAttemptId attemptId = new TaskAttemptId(request.getId());
+ QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
+ if (queryMasterTask == null) {
+ queryMasterTask = queryMaster.getQueryMasterTask(queryId, true);
+ }
+ Stage sq = queryMasterTask.getQuery().getStage(attemptId.getTaskId().getExecutionBlockId());
+ Task task = sq.getTask(attemptId.getTaskId());
+ TaskAttempt attempt = task.getAttempt(attemptId.getId());
- if(LOG.isDebugEnabled()){
- LOG.debug(String.format("Task State: %s, Attempt State: %s", task.getState().name(), attempt.getState().name()));
- }
+ if(LOG.isDebugEnabled()){
+ LOG.debug(String.format("Task State: %s, Attempt State: %s", task.getState().name(), attempt.getState().name()));
+ }
- if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) {
- LOG.warn(attemptId + " Killed");
- attempt.handle(
- new TaskAttemptEvent(new TaskAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED));
- } else {
- queryMasterTask.getEventHandler().handle(
- new TaskAttemptStatusUpdateEvent(new TaskAttemptId(request.getId()), request));
- }
- done.run(TajoWorker.TRUE_PROTO);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- done.run(TajoWorker.FALSE_PROTO);
+ if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) {
+ LOG.warn(attemptId + " Killed");
+ attempt.handle(
+ new TaskAttemptEvent(new TaskAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED));
+ } else {
+ queryMasterTask.getEventHandler().handle(
+ new TaskAttemptStatusUpdateEvent(new TaskAttemptId(request.getId()), request));
}
+
+ done.run(TajoWorker.NULL_PROTO);
}
@Override
public void ping(RpcController controller,
TajoIdProtos.ExecutionBlockIdProto requestProto,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
- done.run(TajoWorker.TRUE_PROTO);
+ RpcCallback<PrimitiveProtos.NullProto> done) {
+ done.run(TajoWorker.NULL_PROTO);
}
@Override
public void fatalError(RpcController controller, TajoWorkerProtocol.TaskFatalErrorReport report,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
- try {
- QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
- new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
- if (queryMasterTask != null) {
- queryMasterTask.handleTaskFailed(report);
- } else {
- LOG.warn("No QueryMasterTask: " + new TaskAttemptId(report.getId()));
- }
- done.run(TajoWorker.TRUE_PROTO);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- done.run(TajoWorker.FALSE_PROTO);
+ RpcCallback<PrimitiveProtos.NullProto> done) {
+ QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
+ new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
+ if (queryMasterTask != null) {
+ queryMasterTask.handleTaskFailed(report);
+ } else {
+ LOG.warn("No QueryMasterTask: " + new TaskAttemptId(report.getId()));
}
+ done.run(TajoWorker.NULL_PROTO);
}
@Override
public void done(RpcController controller, TajoWorkerProtocol.TaskCompletionReport report,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
- try {
- QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
- new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
- if (queryMasterTask != null) {
- queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report));
- }
- done.run(TajoWorker.TRUE_PROTO);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- done.run(TajoWorker.FALSE_PROTO);
+ RpcCallback<PrimitiveProtos.NullProto> done) {
+ QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
+ new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
+ if (queryMasterTask != null) {
+ queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report));
}
+ done.run(TajoWorker.NULL_PROTO);
}
@Override
public void doneExecutionBlock(
RpcController controller, TajoWorkerProtocol.ExecutionBlockReport request,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
+ RpcCallback<PrimitiveProtos.NullProto> done) {
QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(new QueryId(request.getEbId().getQueryId()));
if (queryMasterTask != null) {
ExecutionBlockId ebId = new ExecutionBlockId(request.getEbId());
queryMasterTask.getEventHandler().handle(new StageShuffleReportEvent(ebId, request));
}
- done.run(TajoWorker.TRUE_PROTO);
+ done.run(TajoWorker.NULL_PROTO);
}
@Override
public void killQuery(RpcController controller, TajoIdProtos.QueryIdProto request,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
+ RpcCallback<PrimitiveProtos.NullProto> done) {
QueryId queryId = new QueryId(request);
QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
if (queryMasterTask != null) {
- Query query = queryMasterTask.getQuery();
- if (query != null) {
- query.handle(new QueryEvent(queryId, QueryEventType.KILL));
- }
+ queryMasterTask.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.KILL));
}
+ done.run(TajoWorker.NULL_PROTO);
}
@Override
public void executeQuery(RpcController controller,
TajoWorkerProtocol.QueryExecutionRequestProto request,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
- try {
- workerContext.getWorkerSystemMetrics().counter("querymaster", "numQuery").inc();
-
- QueryId queryId = new QueryId(request.getQueryId());
- LOG.info("Receive executeQuery request:" + queryId);
- queryMaster.handle(new QueryStartEvent(queryId,
- new Session(request.getSession()),
- new QueryContext(workerContext.getQueryMaster().getContext().getConf(),
- request.getQueryContext()), request.getExprInJson().getValue(),
- request.getLogicalPlanJson().getValue()));
- done.run(TajoWorker.TRUE_PROTO);
- } catch (Exception e) {
- workerContext.getWorkerSystemMetrics().counter("querymaster", "errorQuery").inc();
- LOG.error(e.getMessage(), e);
- done.run(TajoWorker.FALSE_PROTO);
- }
+ RpcCallback<PrimitiveProtos.NullProto> done) {
+ workerContext.getWorkerSystemMetrics().counter("querymaster", "numQuery").inc();
+
+ QueryId queryId = new QueryId(request.getQueryId());
+ LOG.info("Receive executeQuery request:" + queryId);
+ queryMaster.handle(new QueryStartEvent(queryId,
+ new Session(request.getSession()),
+ new QueryContext(workerContext.getQueryMaster().getContext().getConf(),
+ request.getQueryContext()), request.getExprInJson().getValue(),
+ request.getLogicalPlanJson().getValue()));
+ done.run(TajoWorker.NULL_PROTO);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index 0d26e6c..cd4b6a6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -54,6 +54,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import static org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService.Interface;
public class ExecutionBlockContext {
/** class logger */
@@ -78,6 +79,8 @@ public class ExecutionBlockContext {
private TajoQueryEngine queryEngine;
private RpcClientManager connManager;
private InetSocketAddress qmMasterAddr;
+ private NettyClientBase client;
+ private QueryMasterProtocol.QueryMasterProtocolService.Interface stub;
private WorkerConnectionInfo queryMaster;
private TajoConf systemConf;
// for the doAs block
@@ -132,16 +135,14 @@ public class ExecutionBlockContext {
// initialize DFS and LocalFileSystems
this.taskOwner = taskOwner;
+ this.stub = getRpcClient().getStub();
this.reporter.startReporter();
-
// resource intiailization
try{
this.resource.initialize(queryContext, plan);
} catch (Throwable e) {
try {
- NettyClientBase client = getQueryMasterConnection();
- QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub();
- stub.killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get());
+ getStub().killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get());
} catch (Throwable t) {
//ignore
}
@@ -153,9 +154,20 @@ public class ExecutionBlockContext {
return resource;
}
- public NettyClientBase getQueryMasterConnection()
+ private NettyClientBase getRpcClient()
throws NoSuchMethodException, ConnectException, ClassNotFoundException {
- return connManager.getClient(qmMasterAddr, QueryMasterProtocol.class, true);
+ if (client != null) return client;
+
+ client = connManager.newClient(qmMasterAddr, QueryMasterProtocol.class, true);
+ return client;
+ }
+
+ public Interface getStub() {
+ return stub;
+ }
+
+ public boolean isStopped() {
+ return stop.get();
}
public void stop(){
@@ -184,6 +196,7 @@ public class ExecutionBlockContext {
tasks.clear();
resource.release();
+ RpcClientManager.cleanup(client);
}
public TajoConf getConf() {
@@ -282,8 +295,7 @@ public class ExecutionBlockContext {
/* This case is that worker did not ran tasks */
if(completedTasksNum.get() == 0) return;
- NettyClientBase client = getQueryMasterConnection();
- QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub();
+ Interface stub = getStub();
ExecutionBlockReport.Builder reporterBuilder = ExecutionBlockReport.newBuilder();
reporterBuilder.setEbId(ebId.getProto());
@@ -379,10 +391,8 @@ public class ExecutionBlockContext {
public void run() {
while (!reporterStop.get() && !Thread.interrupted()) {
- NettyClientBase client = null;
try {
- client = getQueryMasterConnection();
- QueryMasterProtocol.QueryMasterProtocolService.Interface masterStub = client.getStub();
+ Interface masterStub = getStub();
if(tasks.size() == 0){
masterStub.ping(null, getExecutionBlockId().getProto(), NullCallback.get());
http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 79b83e4..b666f80 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -77,6 +77,7 @@ import static org.apache.tajo.conf.TajoConf.ConfVars;
public class TajoWorker extends CompositeService {
public static final PrimitiveProtos.BoolProto TRUE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
public static final PrimitiveProtos.BoolProto FALSE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
+ public static final PrimitiveProtos.NullProto NULL_PROTO = PrimitiveProtos.NullProto.newBuilder().build();
private static final Log LOG = LogFactory.getLog(TajoWorker.class);
http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 71d96c4..bbf8564 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -127,6 +127,7 @@ public class TajoWorkerManagerService extends CompositeService
done.run(TajoWorker.TRUE_PROTO);
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
+ controller.setFailed(t.getMessage());
done.run(TajoWorker.FALSE_PROTO);
}
}
@@ -142,6 +143,7 @@ public class TajoWorkerManagerService extends CompositeService
done.run(TajoWorker.TRUE_PROTO);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
+ controller.setFailed(e.getMessage());
done.run(TajoWorker.FALSE_PROTO);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index a983f78..53ed73e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -440,9 +440,7 @@ public class Task {
executionBlockContext.completedTasksNum.incrementAndGet();
context.getHashShuffleAppenderManager().finalizeTask(taskId);
- NettyClientBase client = executionBlockContext.getQueryMasterConnection();
-
- QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = client.getStub();
+ QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getStub();
if (context.isStopped()) {
context.setExecutorProgress(0.0f);
http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 6076913..31f25f0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -194,24 +194,8 @@ public class TaskRunner extends AbstractService {
CallFuture<TaskRequestProto> callFuture = null;
TaskRequestProto taskRequest = null;
- while(!stopped) {
- NettyClientBase client;
- try {
- client = executionBlockContext.getQueryMasterConnection();
- } catch (ConnectException ce) {
- // NettyClientBase throws ConnectTimeoutException if connection was failed
- stop();
- getContext().stopTaskRunner(getId());
- LOG.error("Connecting to QueryMaster was failed.", ce);
- break;
- } catch (Throwable t) {
- LOG.fatal("Unable to handle exception: " + t.getMessage(), t);
- stop();
- getContext().stopTaskRunner(getId());
- break;
- }
-
- QueryMasterProtocolService.Interface qmClientService = client.getStub();
+ while(!stopped && !executionBlockContext.isStopped()) {
+ QueryMasterProtocolService.Interface qmClientService = executionBlockContext.getStub();
try {
if (callFuture == null) {
@@ -243,8 +227,12 @@ public class TaskRunner extends AbstractService {
}
continue;
} catch (ExecutionException ee) {
- LOG.error(ee.getMessage(), ee);
- break;
+ if(!getContext().isStopped()){
+ LOG.error(ee.getMessage(), ee);
+ } else {
+ /* EB is stopped */
+ break;
+ }
}
if (taskRequest != null) {
@@ -253,9 +241,6 @@ public class TaskRunner extends AbstractService {
// immediately.
if (taskRequest.getShouldDie()) {
LOG.info("Received ShouldDie flag:" + getId());
- stop();
- //notify to TaskRunnerManager
- getContext().stopTaskRunner(getId());
} else {
getContext().getWorkerContext().getWorkerSystemMetrics().counter("query", "task").inc();
LOG.info("Accumulated Received Task: " + (++receivedNum));
@@ -268,7 +253,7 @@ public class TaskRunner extends AbstractService {
}
LOG.info("Initializing: " + taskAttemptId);
- Task task;
+ Task task = null;
try {
task = new Task(getId(), getTaskBaseDir(), taskAttemptId, executionBlockContext,
new TaskRequestImpl(taskRequest));
@@ -283,20 +268,22 @@ public class TaskRunner extends AbstractService {
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
fatalError(qmClientService, taskAttemptId, t.getMessage());
+ if(task != null) {
+ task.cleanupTask();
+ }
} finally {
callFuture = null;
taskRequest = null;
}
}
- } else {
- stop();
- //notify to TaskRunnerManager
- getContext().stopTaskRunner(getId());
}
} catch (Throwable t) {
LOG.fatal(t.getMessage(), t);
}
}
+ stop();
+ //notify to TaskRunnerManager
+ getContext().stopTaskRunner(getId());
}
});
taskLauncher.start();
http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/proto/QueryMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/QueryMasterProtocol.proto b/tajo-core/src/main/proto/QueryMasterProtocol.proto
index ae20309..855c2c6 100644
--- a/tajo-core/src/main/proto/QueryMasterProtocol.proto
+++ b/tajo-core/src/main/proto/QueryMasterProtocol.proto
@@ -34,13 +34,13 @@ package hadoop.yarn;
service QueryMasterProtocolService {
//from Worker
rpc getTask(GetTaskRequestProto) returns (TaskRequestProto);
- rpc statusUpdate (TaskStatusProto) returns (BoolProto);
- rpc ping (ExecutionBlockIdProto) returns (BoolProto);
- rpc fatalError(TaskFatalErrorReport) returns (BoolProto);
- rpc done (TaskCompletionReport) returns (BoolProto);
- rpc doneExecutionBlock(ExecutionBlockReport) returns (BoolProto);
+ rpc statusUpdate (TaskStatusProto) returns (NullProto);
+ rpc ping (ExecutionBlockIdProto) returns (NullProto);
+ rpc fatalError(TaskFatalErrorReport) returns (NullProto);
+ rpc done (TaskCompletionReport) returns (NullProto);
+ rpc doneExecutionBlock(ExecutionBlockReport) returns (NullProto);
//from TajoMaster's QueryJobManager
- rpc killQuery(QueryIdProto) returns (BoolProto);
- rpc executeQuery(QueryExecutionRequestProto) returns (BoolProto);
+ rpc killQuery(QueryIdProto) returns (NullProto);
+ rpc executeQuery(QueryExecutionRequestProto) returns (NullProto);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index 8f6f9ed..0d86527 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -195,7 +195,7 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable
if (maxRetries > retries) {
retries++;
- LOG.warn(future.cause() + " Try to reconnect : " + getKey().addr);
+ LOG.warn(getErrorMessage(ExceptionUtils.getMessage(future.cause())) + " Try to reconnect : " + getKey().addr);
try {
Thread.sleep(RpcConstants.DEFAULT_PAUSE);
} catch (InterruptedException e) {
@@ -246,8 +246,7 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable
private String getErrorMessage(String message) {
return "Exception [" + getKey().protocolClass.getCanonicalName() +
- "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress)
- getChannel().remoteAddress()) + ")]: " + message;
+ "(" + getKey().addr + ")]: " + message;
}
@Override
@@ -332,7 +331,7 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable
throws Exception {
Throwable rootCause = ExceptionUtils.getRootCause(cause);
- LOG.error(getKey().addr + "," + getKey().protocolClass + "," + ExceptionUtils.getMessage(rootCause), rootCause);
+ LOG.error(getErrorMessage(ExceptionUtils.getMessage(rootCause)), rootCause);
if (cause instanceof RecoverableException) {
sendException((RecoverableException) cause);
http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
index f8def7f..111754e 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
@@ -130,6 +130,15 @@ public class RpcClientManager {
return newClient(new RpcConnectionKey(addr, protocolClass, asyncMode), retries, timeout, timeUnit, enablePing);
}
+ public synchronized <T extends NettyClientBase> T newClient(InetSocketAddress addr,
+ Class<?> protocolClass,
+ boolean asyncMode)
+ throws NoSuchMethodException, ClassNotFoundException, ConnectException {
+
+ return newClient(new RpcConnectionKey(addr, protocolClass, asyncMode),
+ retries, getTimeoutSeconds(), TimeUnit.SECONDS, true);
+ }
+
public synchronized <T extends NettyClientBase> T newClient(RpcConnectionKey key,
int retries,
long timeout,