You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/05/12 05:14:07 UTC

[08/14] tajo git commit: TAJO-1593: Add missing stop condition to Taskrunner. (jinho)

TAJO-1593: Add missing stop condition to Taskrunner. (jinho)

Closes #562


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/ddd39213
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/ddd39213
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/ddd39213

Branch: refs/heads/index_support
Commit: ddd39213d87fd76e9e80099e92010fd71b3d363e
Parents: 1baf8dc
Author: Jinho Kim <jh...@apache.org>
Authored: Fri May 8 23:42:03 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Fri May 8 23:42:03 2015 +0900

----------------------------------------------------------------------
 CHANGES                                              |  2 ++
 .../apache/tajo/worker/ExecutionBlockContext.java    | 15 ++++++++++-----
 .../main/java/org/apache/tajo/worker/TaskRunner.java |  1 +
 .../java/org/apache/tajo/rpc/AsyncRpcServer.java     |  6 +++++-
 .../java/org/apache/tajo/rpc/BlockingRpcServer.java  |  6 +++++-
 5 files changed, 23 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/ddd39213/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 9788307..0aad306 100644
--- a/CHANGES
+++ b/CHANGES
@@ -121,6 +121,8 @@ Release 0.11.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1593: Add missing stop condition to Taskrunner. (jinho)
+
     TAJO-1556: "insert into select" with reordered column list does not work.
     (Contributed by Yongjin Choi, Committed by jihoon)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/ddd39213/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index 270000a..0cc3304 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -35,9 +35,8 @@ import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.plan.serder.PlanProto;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcClientManager;
+import org.apache.tajo.rpc.*;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.util.NetUtils;
@@ -50,6 +49,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -307,7 +307,10 @@ public class ExecutionBlockContext {
           getWorkerContext().getHashShuffleAppenderManager().close(ebId);
       if (shuffles == null) {
         reporterBuilder.addAllIntermediateEntries(intermediateEntries);
-        stub.doneExecutionBlock(null, reporterBuilder.build(), NullCallback.get());
+
+        CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>();
+        stub.doneExecutionBlock(callFuture.getController(), reporterBuilder.build(), callFuture);
+        callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
         return;
       }
 
@@ -340,7 +343,9 @@ public class ExecutionBlockContext {
       }
     }
     try {
-      stub.doneExecutionBlock(null, reporterBuilder.build(), NullCallback.get());
+      CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>();
+      stub.doneExecutionBlock(callFuture.getController(), reporterBuilder.build(), callFuture);
+      callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
     } catch (Throwable e) {
       // can't send report to query master
       LOG.fatal(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/tajo/blob/ddd39213/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 31f25f0..774f358 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -241,6 +241,7 @@ public class TaskRunner extends AbstractService {
                 // immediately.
                 if (taskRequest.getShouldDie()) {
                   LOG.info("Received ShouldDie flag:" + getId());
+                  break;
                 } else {
                   getContext().getWorkerContext().getWorkerSystemMetrics().counter("query", "task").inc();
                   LOG.info("Accumulated Received Task: " + (++receivedNum));

http://git-wip-us.apache.org/repos/asf/tajo/blob/ddd39213/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
index 22f47b0..134b3cf 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
@@ -142,7 +142,11 @@ public class AsyncRpcServer extends NettyServerBase {
           ctx.close();
         }
         Throwable rootCause = ExceptionUtils.getRootCause(cause);
-        LOG.fatal(ExceptionUtils.getMessage(rootCause), rootCause);
+        if(rootCause == null) {
+          LOG.fatal(ExceptionUtils.getMessage(cause), cause);
+        } else {
+          LOG.fatal(ExceptionUtils.getMessage(rootCause), rootCause);
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/ddd39213/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
index 93c28e3..007ada5 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
@@ -133,7 +133,11 @@ public class BlockingRpcServer extends NettyServerBase {
           ctx.close();
         }
         Throwable rootCause = ExceptionUtils.getRootCause(cause);
-        LOG.fatal(ExceptionUtils.getMessage(rootCause), rootCause);
+        if(rootCause == null) {
+          LOG.fatal(ExceptionUtils.getMessage(cause), cause);
+        } else {
+          LOG.fatal(ExceptionUtils.getMessage(rootCause), rootCause);
+        }
       }
     }
   }