You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/03/27 09:56:22 UTC
tajo git commit: TAJO-1440: Some tests fail in parallel test
environment in TestKillQuery.
Repository: tajo
Updated Branches:
refs/heads/master b1e174eec -> 373d53cb9
TAJO-1440: Some tests fail in parallel test environment in TestKillQuery.
Closes #472
Signed-off-by: Jinho Kim <jh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/373d53cb
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/373d53cb
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/373d53cb
Branch: refs/heads/master
Commit: 373d53cb93ac98fce07c0be0a626c08a86cf113e
Parents: b1e174e
Author: Jongyoung Park <em...@gmail.com>
Authored: Fri Mar 27 17:54:20 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Fri Mar 27 17:54:20 2015 +0900
----------------------------------------------------------------------
CHANGES | 3 ++
.../java/org/apache/tajo/querymaster/Query.java | 5 +--
.../tajo/querymaster/QueryMasterTask.java | 13 +++++--
.../apache/tajo/querymaster/TestKillQuery.java | 40 +++++++++++++++-----
4 files changed, 44 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/373d53cb/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 60cb0f7..9895656 100644
--- a/CHANGES
+++ b/CHANGES
@@ -47,6 +47,9 @@ Release 0.11.0 - unreleased
BUG FIXES
+ TAJO-1440: Some tests fail in parallel test environment in TestKillQuery.
+ (Contributed by Jongyoung Park. Committed by jinho)
+
TAJO-1147: Simple query doesn't work in Web UI.
(Contributed by Jongyoung Park. Committed by jaehwa)
http://git-wip-us.apache.org/repos/asf/tajo/blob/373d53cb/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
index c2740e5..1ce15fc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
@@ -393,8 +393,7 @@ public class Query implements EventHandler<QueryEvent> {
query.getExecutionBlockCursor().nextBlock());
stage.setPriority(query.priority--);
query.addStage(stage);
-
- stage.handle(new StageEvent(stage.getId(), StageEventType.SQ_INIT));
+ stage.getEventHandler().handle(new StageEvent(stage.getId(), StageEventType.SQ_INIT));
LOG.debug("Schedule unit plan: \n" + stage.getBlock().getPlan());
}
}
@@ -630,7 +629,7 @@ public class Query implements EventHandler<QueryEvent> {
Stage nextStage = new Stage(query.context, query.getPlan(), nextBlock);
nextStage.setPriority(query.priority--);
query.addStage(nextStage);
- nextStage.handle(new StageEvent(nextStage.getId(), StageEventType.SQ_INIT));
+ nextStage.getEventHandler().handle(new StageEvent(nextStage.getId(), StageEventType.SQ_INIT));
LOG.info("Scheduling Stage:" + nextStage.getId());
if(LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/373d53cb/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
index f83cb1e..465fa84 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@ -38,7 +38,6 @@ import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.global.GlobalPlanner;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.exception.UnimplementedException;
@@ -117,7 +116,8 @@ public class QueryMasterTask extends CompositeService {
new ArrayList<TajoWorkerProtocol.TaskFatalErrorReport>();
public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
- QueryId queryId, Session session, QueryContext queryContext, String jsonExpr) {
+ QueryId queryId, Session session, QueryContext queryContext,
+ String jsonExpr, AsyncDispatcher dispatcher) {
super(QueryMasterTask.class.getName());
this.queryMasterContext = queryMasterContext;
@@ -126,6 +126,13 @@ public class QueryMasterTask extends CompositeService {
this.queryContext = queryContext;
this.jsonExpr = jsonExpr;
this.querySubmitTime = System.currentTimeMillis();
+ this.dispatcher = dispatcher;
+ }
+
+ public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
+ QueryId queryId, Session session, QueryContext queryContext,
+ String jsonExpr) {
+ this(queryMasterContext, queryId, session, queryContext, jsonExpr, new AsyncDispatcher());
}
@Override
@@ -145,8 +152,6 @@ public class QueryMasterTask extends CompositeService {
throw new UnimplementedException(resourceManagerClassName + " is not supported yet");
}
addService(resourceAllocator);
-
- dispatcher = new AsyncDispatcher();
addService(dispatcher);
dispatcher.register(StageEventType.class, new StageEventDispatcher());
http://git-wip-us.apache.org/repos/asf/tajo/blob/373d53cb/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index 8fb8e73..09be700 100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@ -19,6 +19,8 @@
package org.apache.tajo.querymaster;
import com.google.common.collect.Lists;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Event;
import org.apache.tajo.*;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.benchmark.TPCH;
@@ -52,6 +54,8 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
@@ -106,29 +110,26 @@ public class TestKillQuery {
GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog);
globalPlanner.build(masterPlan);
+ CountDownLatch barrier = new CountDownLatch(1);
+ MockAsyncDispatch dispatch = new MockAsyncDispatch(barrier, StageEventType.SQ_INIT);
+
QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster();
QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(),
- queryId, session, defaultContext, expr.toJson());
+ queryId, session, defaultContext, expr.toJson(), dispatch);
queryMasterTask.init(conf);
queryMasterTask.getQueryTaskContext().getDispatcher().start();
queryMasterTask.startQuery();
try{
- cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_RUNNING, 2);
- } finally {
- assertEquals(TajoProtos.QueryState.QUERY_RUNNING, queryMasterTask.getQuery().getSynchronizedState());
+ barrier.await(5000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ fail("Query state : " + queryMasterTask.getQuery().getSynchronizedState());
}
Stage stage = queryMasterTask.getQuery().getStages().iterator().next();
assertNotNull(stage);
- try{
- cluster.waitForStageState(stage, StageState.INITED, 2);
- } finally {
- assertEquals(StageState.INITED, stage.getSynchronizedState());
- }
-
// fire kill event
Query q = queryMasterTask.getQuery();
q.handle(new QueryEvent(queryId, QueryEventType.KILL));
@@ -223,4 +224,23 @@ public class TestKillQuery {
assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getStatus());
}
}
+
+ static class MockAsyncDispatch extends AsyncDispatcher {
+ private CountDownLatch latch;
+ private Enum eventType;
+
+ MockAsyncDispatch(CountDownLatch latch, Enum eventType) {
+ super();
+ this.latch = latch;
+ this.eventType = eventType;
+ }
+
+ @Override
+ protected void dispatch(Event event) {
+ if (event.getType() == eventType) {
+ latch.countDown();
+ }
+ super.dispatch(event);
+ }
+ }
}