You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/03/27 09:56:22 UTC

tajo git commit: TAJO-1440: Some tests fail in parallel test environment in TestKillQuery.

Repository: tajo
Updated Branches:
  refs/heads/master b1e174eec -> 373d53cb9


TAJO-1440: Some tests fail in parallel test environment in TestKillQuery.

Closes #472

Signed-off-by: Jinho Kim <jh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/373d53cb
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/373d53cb
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/373d53cb

Branch: refs/heads/master
Commit: 373d53cb93ac98fce07c0be0a626c08a86cf113e
Parents: b1e174e
Author: Jongyoung Park <em...@gmail.com>
Authored: Fri Mar 27 17:54:20 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Fri Mar 27 17:54:20 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 ++
 .../java/org/apache/tajo/querymaster/Query.java |  5 +--
 .../tajo/querymaster/QueryMasterTask.java       | 13 +++++--
 .../apache/tajo/querymaster/TestKillQuery.java  | 40 +++++++++++++++-----
 4 files changed, 44 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/373d53cb/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 60cb0f7..9895656 100644
--- a/CHANGES
+++ b/CHANGES
@@ -47,6 +47,9 @@ Release 0.11.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1440: Some tests fail in parallel test environment in TestKillQuery.
+    (Contributed by Jongyoung Park. Committed by jinho)
+
     TAJO-1147: Simple query doesn't work in Web UI.
     (Contributed by Jongyoung Park. Committed by jaehwa)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/373d53cb/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
index c2740e5..1ce15fc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
@@ -393,8 +393,7 @@ public class Query implements EventHandler<QueryEvent> {
           query.getExecutionBlockCursor().nextBlock());
       stage.setPriority(query.priority--);
       query.addStage(stage);
-
-      stage.handle(new StageEvent(stage.getId(), StageEventType.SQ_INIT));
+      stage.getEventHandler().handle(new StageEvent(stage.getId(), StageEventType.SQ_INIT));
       LOG.debug("Schedule unit plan: \n" + stage.getBlock().getPlan());
     }
   }
@@ -630,7 +629,7 @@ public class Query implements EventHandler<QueryEvent> {
       Stage nextStage = new Stage(query.context, query.getPlan(), nextBlock);
       nextStage.setPriority(query.priority--);
       query.addStage(nextStage);
-      nextStage.handle(new StageEvent(nextStage.getId(), StageEventType.SQ_INIT));
+      nextStage.getEventHandler().handle(new StageEvent(nextStage.getId(), StageEventType.SQ_INIT));
 
       LOG.info("Scheduling Stage:" + nextStage.getId());
       if(LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/373d53cb/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
index f83cb1e..465fa84 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@ -38,7 +38,6 @@ import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.exception.UnimplementedException;
@@ -117,7 +116,8 @@ public class QueryMasterTask extends CompositeService {
       new ArrayList<TajoWorkerProtocol.TaskFatalErrorReport>();
 
   public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
-                         QueryId queryId, Session session, QueryContext queryContext, String jsonExpr) {
+                         QueryId queryId, Session session, QueryContext queryContext,
+                         String jsonExpr, AsyncDispatcher dispatcher) {
 
     super(QueryMasterTask.class.getName());
     this.queryMasterContext = queryMasterContext;
@@ -126,6 +126,13 @@ public class QueryMasterTask extends CompositeService {
     this.queryContext = queryContext;
     this.jsonExpr = jsonExpr;
     this.querySubmitTime = System.currentTimeMillis();
+    this.dispatcher = dispatcher;
+  }
+
+  public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
+                         QueryId queryId, Session session, QueryContext queryContext,
+                         String jsonExpr) {
+    this(queryMasterContext, queryId, session, queryContext, jsonExpr, new AsyncDispatcher());
   }
 
   @Override
@@ -145,8 +152,6 @@ public class QueryMasterTask extends CompositeService {
         throw new UnimplementedException(resourceManagerClassName + " is not supported yet");
       }
       addService(resourceAllocator);
-
-      dispatcher = new AsyncDispatcher();
       addService(dispatcher);
 
       dispatcher.register(StageEventType.class, new StageEventDispatcher());

http://git-wip-us.apache.org/repos/asf/tajo/blob/373d53cb/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index 8fb8e73..09be700 100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@ -19,6 +19,8 @@
 package org.apache.tajo.querymaster;
 
 import com.google.common.collect.Lists;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.tajo.*;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.benchmark.TPCH;
@@ -52,6 +54,8 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.*;
 
@@ -106,29 +110,26 @@ public class TestKillQuery {
     GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog);
     globalPlanner.build(masterPlan);
 
+    CountDownLatch barrier  = new CountDownLatch(1);
+    MockAsyncDispatch dispatch = new MockAsyncDispatch(barrier, StageEventType.SQ_INIT);
+
     QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster();
     QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(),
-        queryId, session, defaultContext, expr.toJson());
+        queryId, session, defaultContext, expr.toJson(), dispatch);
 
     queryMasterTask.init(conf);
     queryMasterTask.getQueryTaskContext().getDispatcher().start();
     queryMasterTask.startQuery();
 
     try{
-      cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_RUNNING, 2);
-    } finally {
-      assertEquals(TajoProtos.QueryState.QUERY_RUNNING, queryMasterTask.getQuery().getSynchronizedState());
+      barrier.await(5000, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      fail("Query state : " + queryMasterTask.getQuery().getSynchronizedState());
     }
 
     Stage stage = queryMasterTask.getQuery().getStages().iterator().next();
     assertNotNull(stage);
 
-    try{
-      cluster.waitForStageState(stage, StageState.INITED, 2);
-    } finally {
-      assertEquals(StageState.INITED, stage.getSynchronizedState());
-    }
-
     // fire kill event
     Query q = queryMasterTask.getQuery();
     q.handle(new QueryEvent(queryId, QueryEventType.KILL));
@@ -223,4 +224,23 @@ public class TestKillQuery {
       assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getStatus());
     }
   }
+
+  static class MockAsyncDispatch extends AsyncDispatcher {
+    private CountDownLatch latch;
+    private Enum eventType;
+
+    MockAsyncDispatch(CountDownLatch latch, Enum eventType) {
+      super();
+      this.latch = latch;
+      this.eventType = eventType;
+    }
+
+    @Override
+    protected void dispatch(Event event) {
+      if (event.getType() == eventType) {
+        latch.countDown();
+      }
+      super.dispatch(event);
+    }
+  }
 }