You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/09/27 01:52:10 UTC
[4/8] git commit: TAJO-1025: Network disconnection during query
processing can cause infinite exceptions. (Jihun Kang via jinho)
TAJO-1025: Network disconnection during query processing can cause infinite exceptions. (Jihun Kang via jinho)
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/9e026a9a
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/9e026a9a
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/9e026a9a
Branch: refs/heads/block_iteration
Commit: 9e026a9a26a1004d4742f0e44d5b5d71691195ad
Parents: 5a7e272
Author: jhkim <jh...@apache.org>
Authored: Wed Sep 24 15:45:04 2014 +0900
Committer: jhkim <jh...@apache.org>
Committed: Wed Sep 24 15:45:04 2014 +0900
----------------------------------------------------------------------
CHANGES | 3 +++
.../tajo/worker/ExecutionBlockContext.java | 4 ++-
.../java/org/apache/tajo/worker/TaskRunner.java | 26 +++++++++++++++++++-
.../apache/tajo/rpc/RemoteCallException.java | 3 +++
4 files changed, 34 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/9e026a9a/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 3372eb6..7cbd524 100644
--- a/CHANGES
+++ b/CHANGES
@@ -152,6 +152,9 @@ Release 0.9.0 - unreleased
BUG FIXES
+ TAJO-1025: Network disconnection during query processing can cause
+ infinite exceptions. (Jihun Kang via jinho)
+
TAJO-1047: DefaultTaskScheduler:allocateRackTask is failed occasionally
on JDK 1.7. (jinho)
http://git-wip-us.apache.org/repos/asf/tajo/blob/9e026a9a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index 1ec8a88..f18723f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -43,6 +43,7 @@ import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.Pair;
import org.apache.tajo.worker.event.TaskRunnerStartEvent;
+import org.jboss.netty.channel.ConnectTimeoutException;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.util.Timer;
@@ -146,7 +147,8 @@ public class ExecutionBlockContext {
return resource;
}
- public QueryMasterProtocol.QueryMasterProtocolService.Interface getQueryMasterStub() throws Exception {
+ public QueryMasterProtocol.QueryMasterProtocolService.Interface getQueryMasterStub()
+ throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException {
NettyClientBase clientBase = null;
try {
clientBase = connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true);
http://git-wip-us.apache.org/repos/asf/tajo/blob/9e026a9a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index e4771a6..1910575 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -35,6 +35,7 @@ import org.apache.tajo.engine.query.QueryUnitRequestImpl;
import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NullCallback;
+import org.jboss.netty.channel.ConnectTimeoutException;
import java.util.concurrent.*;
@@ -180,6 +181,7 @@ public class TaskRunner extends AbstractService {
try {
taskLauncher = new Thread(new Runnable() {
+
@Override
public void run() {
int receivedNum = 0;
@@ -190,7 +192,20 @@ public class TaskRunner extends AbstractService {
QueryMasterProtocolService.Interface qmClientService;
try {
qmClientService = getContext().getQueryMasterStub();
+ } catch (ConnectTimeoutException ce) {
+ // NettyClientBase throws ConnectTimeoutException if connection was failed
+ stop();
+ getContext().stopTaskRunner(getId());
+ LOG.error("Connecting to QueryMaster was failed.", ce);
+ break;
+ } catch (Throwable t) {
+ LOG.fatal("Unable to handle exception: " + t.getMessage(), t);
+ stop();
+ getContext().stopTaskRunner(getId());
+ break;
+ }
+ try {
if (callFuture == null) {
callFuture = new CallFuture<QueryUnitRequestProto>();
LOG.info("Request GetTask: " + getId());
@@ -200,7 +215,7 @@ public class TaskRunner extends AbstractService {
.setWorkerId(getContext().getWorkerContext().getConnectionInfo().getId())
.build();
- qmClientService.getTask(null, request, callFuture);
+ qmClientService.getTask(callFuture.getController(), request, callFuture);
}
try {
// wait for an assigning task for 3 seconds
@@ -213,6 +228,11 @@ public class TaskRunner extends AbstractService {
if(stopped) {
break;
}
+
+ if(callFuture.getController().failed()){
+ LOG.error(callFuture.getController().errorText());
+ break;
+ }
// if there has been no assigning task for a given period,
// TaskRunner will retry to request an assigning task.
if (LOG.isDebugEnabled()) {
@@ -262,6 +282,10 @@ public class TaskRunner extends AbstractService {
taskRequest = null;
}
}
+ } else {
+ stop();
+ //notify to TaskRunnerManager
+ getContext().stopTaskRunner(getId());
}
} catch (Throwable t) {
LOG.fatal(t.getMessage(), t);
http://git-wip-us.apache.org/repos/asf/tajo/blob/9e026a9a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
index 949aa58..90ee58a 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
@@ -41,6 +41,9 @@ public class RemoteCallException extends RemoteException {
public RemoteCallException(int seqId, Throwable t) {
super(t);
this.seqId = seqId;
+ if (t != null) {
+ originExceptionClass = t.getClass().getCanonicalName();
+ }
}
public RpcResponse getResponse() {