You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/09/24 08:45:56 UTC

git commit: TAJO-1025: Network disconnection during query processing can cause infinite exceptions. (Jihun Kang via jinho)

Repository: tajo
Updated Branches:
  refs/heads/master 5a7e27254 -> 9e026a9a2


TAJO-1025: Network disconnection during query processing can cause infinite exceptions. (Jihun Kang via jinho)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/9e026a9a
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/9e026a9a
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/9e026a9a

Branch: refs/heads/master
Commit: 9e026a9a26a1004d4742f0e44d5b5d71691195ad
Parents: 5a7e272
Author: jhkim <jh...@apache.org>
Authored: Wed Sep 24 15:45:04 2014 +0900
Committer: jhkim <jh...@apache.org>
Committed: Wed Sep 24 15:45:04 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 +++
 .../tajo/worker/ExecutionBlockContext.java      |  4 ++-
 .../java/org/apache/tajo/worker/TaskRunner.java | 26 +++++++++++++++++++-
 .../apache/tajo/rpc/RemoteCallException.java    |  3 +++
 4 files changed, 34 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/9e026a9a/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 3372eb6..7cbd524 100644
--- a/CHANGES
+++ b/CHANGES
@@ -152,6 +152,9 @@ Release 0.9.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1025: Network disconnection during query processing can cause 
+    infinite exceptions. (Jihun Kang via jinho)
+
     TAJO-1047: DefaultTaskScheduler:allocateRackTask is failed occasionally
     on JDK 1.7. (jinho)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/9e026a9a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index 1ec8a88..f18723f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -43,6 +43,7 @@ import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.Pair;
 import org.apache.tajo.worker.event.TaskRunnerStartEvent;
+import org.jboss.netty.channel.ConnectTimeoutException;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.util.Timer;
 
@@ -146,7 +147,8 @@ public class ExecutionBlockContext {
     return resource;
   }
 
-  public QueryMasterProtocol.QueryMasterProtocolService.Interface getQueryMasterStub() throws Exception {
+  public QueryMasterProtocol.QueryMasterProtocolService.Interface getQueryMasterStub()
+      throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException {
     NettyClientBase clientBase = null;
     try {
       clientBase = connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true);

http://git-wip-us.apache.org/repos/asf/tajo/blob/9e026a9a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index e4771a6..1910575 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -35,6 +35,7 @@ import org.apache.tajo.engine.query.QueryUnitRequestImpl;
 import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NullCallback;
+import org.jboss.netty.channel.ConnectTimeoutException;
 
 import java.util.concurrent.*;
 
@@ -180,6 +181,7 @@ public class TaskRunner extends AbstractService {
     try {
 
       taskLauncher = new Thread(new Runnable() {
+
         @Override
         public void run() {
           int receivedNum = 0;
@@ -190,7 +192,20 @@ public class TaskRunner extends AbstractService {
             QueryMasterProtocolService.Interface qmClientService;
             try {
               qmClientService = getContext().getQueryMasterStub();
+            } catch (ConnectTimeoutException ce) {
+              // NettyClientBase throws ConnectTimeoutException if connection was failed
+              stop();
+              getContext().stopTaskRunner(getId());
+              LOG.error("Connecting to QueryMaster was failed.", ce);
+              break;
+            } catch (Throwable t) {
+              LOG.fatal("Unable to handle exception: " + t.getMessage(), t);
+              stop();
+              getContext().stopTaskRunner(getId());
+              break;
+            }
 
+            try {
               if (callFuture == null) {
                 callFuture = new CallFuture<QueryUnitRequestProto>();
                 LOG.info("Request GetTask: " + getId());
@@ -200,7 +215,7 @@ public class TaskRunner extends AbstractService {
                     .setWorkerId(getContext().getWorkerContext().getConnectionInfo().getId())
                     .build();
 
-                qmClientService.getTask(null, request, callFuture);
+                qmClientService.getTask(callFuture.getController(), request, callFuture);
               }
               try {
                 // wait for an assigning task for 3 seconds
@@ -213,6 +228,11 @@ public class TaskRunner extends AbstractService {
                 if(stopped) {
                   break;
                 }
+
+                if(callFuture.getController().failed()){
+                  LOG.error(callFuture.getController().errorText());
+                  break;
+                }
                 // if there has been no assigning task for a given period,
                 // TaskRunner will retry to request an assigning task.
                 if (LOG.isDebugEnabled()) {
@@ -262,6 +282,10 @@ public class TaskRunner extends AbstractService {
                     taskRequest = null;
                   }
                 }
+              } else {
+                stop();
+                //notify to TaskRunnerManager
+                getContext().stopTaskRunner(getId());
               }
             } catch (Throwable t) {
               LOG.fatal(t.getMessage(), t);

http://git-wip-us.apache.org/repos/asf/tajo/blob/9e026a9a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
index 949aa58..90ee58a 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
@@ -41,6 +41,9 @@ public class RemoteCallException extends RemoteException {
   public RemoteCallException(int seqId, Throwable t) {
     super(t);
     this.seqId = seqId;
+    if (t != null) {
+      originExceptionClass = t.getClass().getCanonicalName();
+    }
   }
 
   public RpcResponse getResponse() {