You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/04/02 19:17:44 UTC

git commit: TAJO-15: The Integration test is getting hanged on Mac OS X. (hyunsik)

Updated Branches:
  refs/heads/master c2362cf3b -> fe5b030c5


TAJO-15: The Integration test is getting hanged on Mac OS X. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/fe5b030c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/fe5b030c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/fe5b030c

Branch: refs/heads/master
Commit: fe5b030c53f9babf11370cf3e483898fa2fe3063
Parents: c2362cf
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Apr 3 02:09:19 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Apr 3 02:14:33 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                        |   10 +-
 .../src/main/java/tajo/worker/TaskRunner.java      |   98 ++++-----------
 .../src/test/java/tajo/worker/TaskRunnerTest.java  |    3 +-
 3 files changed, 32 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fe5b030c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index da3a5a2..d35db48 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,7 +9,7 @@ Release 0.2.0 - unreleased
     TAJO-17: Improve the examples for jvm version and auxiliary service in 
     getting_started.apt. (hyunsik)
 
-    TAJO-10: Modify git ignore to include Apache derby log file  
+    TAJO-10: Modify git ignore to include Apache derby log file. (hsaputra)
 
     TAJO-12: Add information in README on how to subscribe to mailing 
     lists (mattmann, hyunsik)
@@ -22,9 +22,13 @@ Release 0.2.0 - unreleased
 
   BUG FIXES
 
-    TAJO-8: MapReduce's ShuffleHandler and Tajo's PullServerAuxService conflict each other
+    TAJO-15: The Integration test is getting hanged on Mac OS X. (hyunsik)
 
-    TAJO-13: Update the groupId property in the pom.xml to use ASF org instead of kr.ac.korea.dbserver
+    TAJO-8: MapReduce's ShuffleHandler and Tajo's PullServerAuxService conflict
+    each other. (jhkim)
+
+    TAJO-13: Update the groupId property in the pom.xml to use ASF org instead
+    of kr.ac.korea.dbserver (hsaputra)
 
     TAJO-1: RCFileWrapper always reads whole columns regardless of the target 
     schema. (jihoonson via hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fe5b030c/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java
index cf0dbc4..9e77164 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java
@@ -18,7 +18,6 @@
 
 package tajo.worker;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -39,7 +38,6 @@ import tajo.QueryUnitAttemptId;
 import tajo.SubQueryId;
 import tajo.TajoProtos.TaskAttemptState;
 import tajo.conf.TajoConf.ConfVars;
-import tajo.engine.MasterWorkerProtos;
 import tajo.engine.MasterWorkerProtos.QueryUnitRequestProto;
 import tajo.engine.query.QueryUnitRequestImpl;
 import tajo.ipc.MasterWorkerProtocol;
@@ -50,19 +48,19 @@ import tajo.rpc.NullCallback;
 import tajo.rpc.ProtoAsyncRpcClient;
 import tajo.util.TajoIdUtils;
 
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.security.PrivilegedExceptionAction;
 import java.util.Map;
 import java.util.concurrent.*;
 
+import static tajo.engine.MasterWorkerProtos.TaskFatalErrorReport;
+
 public class TaskRunner extends AbstractService {
   private static final Log LOG = LogFactory.getLog(TaskRunner.class);
   private QueryConf conf;
 
   private volatile boolean stopped = false;
-  private volatile boolean isOnline = false;
 
   private final SubQueryId subQueryId;
   private ApplicationId appId;
@@ -70,7 +68,6 @@ public class TaskRunner extends AbstractService {
   private final ContainerId containerId;
 
   // Cluster Management
-  private ProtoAsyncRpcClient client;
   private MasterWorkerProtocolService.Interface master;
 
   // Query Processing
@@ -78,7 +75,6 @@ public class TaskRunner extends AbstractService {
   private FileSystem defaultFS;
 
   private TajoQueryEngine queryEngine;
-  private QueryLauncher queryLauncher;
   private final int coreNum = 4;
   private final ExecutorService fetchLauncher =
       Executors.newFixedThreadPool(coreNum * 4);
@@ -97,14 +93,12 @@ public class TaskRunner extends AbstractService {
       final SubQueryId subQueryId,
       final NodeId nodeId,
       UserGroupInformation taskOwner,
-      ProtoAsyncRpcClient client,
       Interface master, ContainerId containerId) {
     super(TaskRunner.class.getName());
     this.subQueryId = subQueryId;
     this.appId = subQueryId.getQueryId().getApplicationId();
     this.nodeId = nodeId;
     this.taskOwner = taskOwner;
-    this.client = client;
     this.master = master;
     this.containerId = containerId;
   }
@@ -143,7 +137,6 @@ public class TaskRunner extends AbstractService {
 
       // Setup QueryEngine according to the query plan
       // Here, we can setup row-based query engine or columnar query engine.
-      this.queryLauncher = new QueryLauncher();
       this.queryEngine = new TajoQueryEngine(conf);
     } catch (Throwable t) {
       LOG.error(t);
@@ -168,8 +161,6 @@ public class TaskRunner extends AbstractService {
       synchronized (this) {
         notifyAll();
       }
-
-      client.close();
     }
   }
 
@@ -215,6 +206,14 @@ public class TaskRunner extends AbstractService {
     }
   }
 
+  static void fatalError(MasterWorkerProtocolService.Interface proxy,
+                                 QueryUnitAttemptId taskAttemptId, String message) {
+    TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder()
+        .setId(taskAttemptId.getProto())
+        .setErrorMessage(message);
+    proxy.fatalError(null, builder.build(), NullCallback.get());
+  }
+
   public void run() {
     LOG.info("Tajo Worker startup");
 
@@ -229,12 +228,6 @@ public class TaskRunner extends AbstractService {
 
           while(!stopped) {
             try {
-
-              while(!stopped && !queryLauncher.hasAvailableSlot()) {
-                Thread.sleep(1000);
-              }
-
-              if (!stopped) {
                 if (callFuture == null) {
                   callFuture = new CallFuture2<QueryUnitRequestProto>();
                   master.getTask(null, ((ContainerIdPBImpl) containerId).getProto(),
@@ -250,38 +243,38 @@ public class TaskRunner extends AbstractService {
                   if (taskRequest.getShouldDie()) {
                     LOG.info("received ShouldDie flag");
                     stop();
+
                   } else {
+
                     LOG.info("Accumulated Received Task: " + (++receivedNum));
-                    QueryUnitAttemptId taskAttemptId =
-                        new QueryUnitAttemptId(taskRequest.getId());
+
+                    QueryUnitAttemptId taskAttemptId = new QueryUnitAttemptId(taskRequest.getId());
                     if (tasks.containsKey(taskAttemptId)) {
-                      MasterWorkerProtos.TaskFatalErrorReport.Builder builder =
-                      MasterWorkerProtos.TaskFatalErrorReport.newBuilder()
-                          .setErrorMessage("Duplicate Task Attempt: " +
-                          taskAttemptId);
-                      master.fatalError(null, builder.build(), NullCallback.get());
+                      fatalError(master, taskAttemptId, "Duplicate Task Attempt: " + taskAttemptId);
                       continue;
                     }
+
                     Path taskTempDir = localFS.makeQualified(
                         lDirAllocator.getLocalPathForWrite(baseDir +
                             "/" + taskAttemptId.getQueryUnitId().getId()
                             + "_" + taskAttemptId.getId(), conf));
+
                     LOG.info("Initializing: " + taskAttemptId);
                     Task task = new Task(taskAttemptId, workerContext, master,
                         new QueryUnitRequestImpl(taskRequest), taskTempDir);
                     tasks.put(taskAttemptId, task);
+
                     task.init();
                     if (task.hasFetchPhase()) {
                       task.fetch(); // The fetch is performed in an asynchronous way.
                     }
-
+                    // task.run() is a blocking call.
                     task.run();
 
                     callFuture = null;
                     taskRequest = null;
                   }
                 }
-              }
             } catch (Throwable t) {
               LOG.error(t);
             }
@@ -299,11 +292,7 @@ public class TaskRunner extends AbstractService {
           t.abort();
         }
       }
-
-      client.close();
     }
-
-    LOG.info("TaskRunner (" + nodeId + ") main thread exiting");
   }
 
   private class ShutdownHook implements Runnable {
@@ -314,10 +303,6 @@ public class TaskRunner extends AbstractService {
     }
   }
 
-  public String getServerName() {
-    return nodeId.toString();
-  }
-
   /**
    * @return true if a stop has been requested.
    */
@@ -325,42 +310,6 @@ public class TaskRunner extends AbstractService {
     return this.stopped;
   }
 
-  public boolean isOnline() {
-    return this.isOnline;
-  }
-
-  public void shutdown(final String msg) {
-
-  }
-
-  @VisibleForTesting
-  Task getTask(QueryUnitAttemptId id) {
-    return this.tasks.get(id);
-  }
-
-  private class QueryLauncher {
-    private final ThreadPoolExecutor executor
-        = new ThreadPoolExecutor(coreNum, coreNum * 4, 0L, TimeUnit.MILLISECONDS,
-        new LinkedBlockingQueue<Runnable>(coreNum * 4));
-    private boolean stopped = false;
-
-    public void schedule(Task task) throws InterruptedException {
-
-    }
-
-    public boolean hasAvailableSlot() {
-      return executor.getQueue().size() < coreNum;
-    }
-  }
-
-  public Path getTaskTempDir(QueryUnitAttemptId taskAttemptId)
-      throws IOException {
-    return lDirAllocator.
-        getLocalPathToRead(baseDir + "/" + taskAttemptId.getId(),
-            conf);
-  }
-
-
   /**
    * 1st Arg: TaskRunnerListener hostname
    * 2nd Arg: TaskRunnerListener port
@@ -368,8 +317,6 @@ public class TaskRunner extends AbstractService {
    * 4th Arg: NodeId
    */
   public static void main(String[] args) throws Exception {
-    LOG.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
-    System.out.println(System.getenv("CLASSPATH"));
     // Restore QueryConf
     final QueryConf conf = new QueryConf();
     conf.addResource(new Path(QueryConf.FILENAME));
@@ -402,7 +349,7 @@ public class TaskRunner extends AbstractService {
     ProtoAsyncRpcClient client;
     MasterWorkerProtocolService.Interface master;
 
-    // Create TaskUmbilicalProtocol as actual task owner.
+    // Create MasterWorkerProtocol as actual task owner.
     client =
         taskOwner.doAs(new PrivilegedExceptionAction<ProtoAsyncRpcClient>() {
           @Override
@@ -413,8 +360,11 @@ public class TaskRunner extends AbstractService {
     master = client.getStub();
 
 
-    TaskRunner taskRunner = new TaskRunner(subQueryId, nodeId, taskOwner, client, master, containerId);
+    TaskRunner taskRunner = new TaskRunner(subQueryId, nodeId, taskOwner, master, containerId);
     taskRunner.init(conf);
     taskRunner.start();
+    client.close();
+    LOG.info("TaskRunner (" + nodeId + ") main thread exiting");
+    System.exit(0);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fe5b030c/tajo-core/tajo-core-backend/src/test/java/tajo/worker/TaskRunnerTest.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/tajo/worker/TaskRunnerTest.java b/tajo-core/tajo-core-backend/src/test/java/tajo/worker/TaskRunnerTest.java
index 131e797..7d082bd 100644
--- a/tajo-core/tajo-core-backend/src/test/java/tajo/worker/TaskRunnerTest.java
+++ b/tajo-core/tajo-core-backend/src/test/java/tajo/worker/TaskRunnerTest.java
@@ -58,8 +58,7 @@ public class TaskRunnerTest {
     nodeId.setPort(9001);
     UserGroupInformation mockTaskOwner = mock(UserGroupInformation.class);
     when(mockTaskOwner.getShortUserName()).thenReturn("hyunsik");
-    TaskRunner runner = new TaskRunner(sq1, nodeId, mockTaskOwner, mockClient,
-        mockMaster, cId);
+    TaskRunner runner = new TaskRunner(sq1, nodeId, mockTaskOwner, mockMaster, cId);
     QueryConf conf = new QueryConf();
     conf.setOutputPath(new Path("/tmp/" + q1));
     runner.init(conf);