You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/01/08 08:19:39 UTC

tajo git commit: TAJO-1279: Cleanup TajoAsyncDispatcher and interrupt stop events. (jinho)

Repository: tajo
Updated Branches:
  refs/heads/master 7615b7576 -> 6582d8656


TAJO-1279: Cleanup TajoAsyncDispatcher and interrupt stop events. (jinho)

Closes #331


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/6582d865
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/6582d865
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/6582d865

Branch: refs/heads/master
Commit: 6582d8656a3e8147984a86a9e62688b52ec1f681
Parents: 7615b75
Author: jhkim <jh...@apache.org>
Authored: Thu Jan 8 16:18:31 2015 +0900
Committer: jhkim <jh...@apache.org>
Committed: Thu Jan 8 16:18:31 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../java/org/apache/tajo/benchmark/TPCH.java    |  22 ++
 .../tajo/master/DefaultTaskScheduler.java       |   6 +-
 .../apache/tajo/master/TajoAsyncDispatcher.java | 232 -------------------
 .../tajo/master/event/QueryStopEvent.java       |  47 ++++
 .../master/querymaster/QueryInProgress.java     |  14 +-
 .../tajo/master/querymaster/QueryJobEvent.java  |   1 +
 .../master/querymaster/QueryJobManager.java     |  16 +-
 .../tajo/master/querymaster/QueryMaster.java    |  17 +-
 .../master/querymaster/QueryMasterTask.java     |  44 ++--
 .../apache/tajo/master/querymaster/Stage.java   |   8 +-
 .../java/org/apache/tajo/QueryTestCaseBase.java |   6 +
 .../org/apache/tajo/TajoTestingCluster.java     |   8 +-
 .../test/java/org/apache/tajo/TpchTestBase.java |   8 +-
 .../org/apache/tajo/cli/tsql/TestTajoCli.java   |   4 +-
 .../java/org/apache/tajo/jdbc/TestTajoJdbc.java |  29 ++-
 .../tajo/master/querymaster/TestKillQuery.java  |  37 ++-
 .../tajo/scheduler/TestFifoScheduler.java       |  43 ++--
 .../org/apache/tajo/worker/TestHistory.java     |  44 ++--
 19 files changed, 236 insertions(+), 353 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 9ca4579..fe777a7 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,9 @@ Release 0.9.1 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1279: Cleanup TajoAsyncDispatcher and interrupt stop events. 
+    (jinho)
+
     TAJO-1285: Refactoring Magic Number to HAConstants. 
     (DaeMyung Kang via jaehwa)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java b/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
index f4b4d6a..e2ea25c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
+++ b/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.benchmark;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
@@ -33,8 +34,10 @@ import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.storage.StorageConstants;
 
+import java.io.File;
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.List;
 import java.util.Map;
 
 public class TPCH extends BenchmarkSet {
@@ -225,4 +228,23 @@ public class TPCH extends BenchmarkSet {
       throw new ServiceException(s);
     }
   }
+
+  public static List<String> getDataFilePaths(String... tables) {
+    List<String> tablePaths = Lists.newArrayList();
+    File file;
+    for (String table : tables) {
+      file = getDataFile(table);
+      tablePaths.add(file.getAbsolutePath());
+    }
+    return tablePaths;
+  }
+
+  public static File getDataFile(String table) {
+    File file = new File("src/test/tpch/" + table + ".tbl");
+    if (!file.exists()) {
+      file = new File(System.getProperty("user.dir") + "/tajo-core/src/test/tpch/" + table
+          + ".tbl");
+    }
+    return file;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index 1cd6587..d47c93a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -145,8 +145,10 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
     }
 
     // Return all of request callbacks instantly.
-    for (TaskRequestEvent req : taskRequests.taskRequestQueue) {
-      req.getCallback().run(stopTaskRunnerReq);
+    if(taskRequests != null){
+      for (TaskRequestEvent req : taskRequests.taskRequestQueue) {
+        req.getCallback().run(stopTaskRunnerReq);
+      }
     }
 
     LOG.info("Task Scheduler stopped");

http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
deleted file mode 100644
index 751b21b..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.util.ShutdownHookManager;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-public class TajoAsyncDispatcher extends AbstractService  implements Dispatcher {
-
-  private static final Log LOG = LogFactory.getLog(TajoAsyncDispatcher.class);
-
-  private final BlockingQueue<Event> eventQueue;
-  private volatile boolean stopped = false;
-
-  private Thread eventHandlingThread;
-  protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
-  private boolean exitOnDispatchException;
-
-  private String id;
-
-  public TajoAsyncDispatcher(String id) {
-    this(id, new LinkedBlockingQueue<Event>());
-  }
-
-  public TajoAsyncDispatcher(String id, BlockingQueue<Event> eventQueue) {
-    super(TajoAsyncDispatcher.class.getName());
-    this.id = id;
-    this.eventQueue = eventQueue;
-    this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
-  }
-
-  Runnable createThread() {
-    return new Runnable() {
-      @Override
-      public void run() {
-        while (!stopped && !Thread.currentThread().isInterrupted()) {
-          Event event;
-          try {
-            event = eventQueue.take();
-            if(LOG.isDebugEnabled()) {
-              LOG.debug(id + ",event take:" + event.getType() + "," + event);
-            }
-          } catch(InterruptedException ie) {
-            if (!stopped) {
-              LOG.warn("AsyncDispatcher thread interrupted");
-            }
-            return;
-          }
-          dispatch(event);
-        }
-      }
-    };
-  }
-
-  @Override
-  public synchronized void init(Configuration conf) {
-    this.exitOnDispatchException =
-        conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
-            Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
-    super.init(conf);
-  }
-
-  @Override
-  public void start() {
-    //start all the components
-    super.start();
-    eventHandlingThread = new Thread(createThread());
-    eventHandlingThread.setName("AsyncDispatcher event handler");
-    eventHandlingThread.start();
-
-    LOG.info("AsyncDispatcher started:" + id);
-  }
-
-  @Override
-  public synchronized void stop() {
-    if(stopped) {
-      return;
-    }
-    stopped = true;
-    if (eventHandlingThread != null) {
-      eventHandlingThread.interrupt();
-      try {
-        eventHandlingThread.join();
-      } catch (InterruptedException ie) {
-        LOG.warn("Interrupted Exception while stopping");
-      }
-    }
-
-    // stop all the components
-    super.stop();
-
-    LOG.info("AsyncDispatcher stopped:" + id);
-  }
-
-  @SuppressWarnings("unchecked")
-  protected void dispatch(Event event) {
-    //all events go thru this loop
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
-          + event.toString());
-    }
-    Class<? extends Enum> type = event.getType().getDeclaringClass();
-
-    try{
-      EventHandler handler = eventDispatchers.get(type);
-      if(handler != null) {
-        handler.handle(event);
-      } else {
-        throw new Exception("No handler for registered for " + type);
-      }
-    } catch (Throwable t) {
-      //TODO Maybe log the state of the queue
-      LOG.fatal("Error in dispatcher thread:" + event.getType(), t);
-      if (exitOnDispatchException && (ShutdownHookManager.get().isShutdownInProgress()) == false) {
-        LOG.info("Exiting, bye..");
-        System.exit(-1);
-      }
-    } finally {
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void register(Class<? extends Enum> eventType,
-                       EventHandler handler) {
-    /* check to see if we have a listener registered */
-    EventHandler<Event> registeredHandler = (EventHandler<Event>)
-        eventDispatchers.get(eventType);
-    LOG.debug("Registering " + eventType + " for " + handler.getClass());
-    if (registeredHandler == null) {
-      eventDispatchers.put(eventType, handler);
-    } else if (!(registeredHandler instanceof MultiListenerHandler)){
-      /* for multiple listeners of an event add the multiple listener handler */
-      MultiListenerHandler multiHandler = new MultiListenerHandler();
-      multiHandler.addHandler(registeredHandler);
-      multiHandler.addHandler(handler);
-      eventDispatchers.put(eventType, multiHandler);
-    } else {
-      /* already a multilistener, just add to it */
-      MultiListenerHandler multiHandler
-          = (MultiListenerHandler) registeredHandler;
-      multiHandler.addHandler(handler);
-    }
-  }
-
-  @Override
-  public EventHandler getEventHandler() {
-    return new GenericEventHandler();
-  }
-
-  class GenericEventHandler implements EventHandler<Event> {
-    public void handle(Event event) {
-      /* all this method does is enqueue all the events onto the queue */
-      int qSize = eventQueue.size();
-      if (qSize !=0 && qSize %1000 == 0) {
-        LOG.info("Size of event-queue is " + qSize);
-      }
-      int remCapacity = eventQueue.remainingCapacity();
-      if (remCapacity < 1000) {
-        LOG.warn("Very low remaining capacity in the event-queue: "
-            + remCapacity);
-      }
-      try {
-        if(LOG.isDebugEnabled()) {
-          LOG.debug(id + ",add event:" +
-              event.getType() + "," + event + "," +
-              (eventHandlingThread == null ? "null" : eventHandlingThread.isAlive()));
-        }
-        eventQueue.put(event);
-      } catch (InterruptedException e) {
-        if (!stopped) {
-          LOG.warn("AsyncDispatcher thread interrupted", e);
-        }
-        throw new YarnRuntimeException(e);
-      }
-    }
-  }
-
-  /**
-   * Multiplexing an event. Sending it to different handlers that
-   * are interested in the event.
-   */
-  static class MultiListenerHandler implements EventHandler<Event> {
-    List<EventHandler<Event>> listofHandlers;
-
-    public MultiListenerHandler() {
-      listofHandlers = new ArrayList<EventHandler<Event>>();
-    }
-
-    @Override
-    public void handle(Event event) {
-      for (EventHandler<Event> handler: listofHandlers) {
-        handler.handle(event);
-      }
-    }
-
-    void addHandler(EventHandler<Event> handler) {
-      listofHandlers.add(handler);
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStopEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStopEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStopEvent.java
new file mode 100644
index 0000000..6d57d4a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStopEvent.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.QueryId;
+
+/**
+ * This event is conveyed to QueryMaster.
+ */
+public class QueryStopEvent extends AbstractEvent {
+  public enum EventType {
+    QUERY_STOP
+  }
+
+  private final QueryId queryId;
+
+  public QueryStopEvent(QueryId queryId) {
+    super(EventType.QUERY_STOP);
+    this.queryId = queryId;
+  }
+
+  public QueryId getQueryId() {
+    return queryId;
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getName() + "," + getType() + "," + queryId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index ca0bd72..0a87990 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -23,16 +23,15 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.ContainerProtocol;
 import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto;
-import org.apache.tajo.master.TajoAsyncDispatcher;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.rm.WorkerResourceManager;
 import org.apache.tajo.master.session.Session;
@@ -55,7 +54,7 @@ public class QueryInProgress extends CompositeService {
 
   private Session session;
 
-  private TajoAsyncDispatcher dispatcher;
+  private AsyncDispatcher dispatcher;
 
   private LogicalRootNode plan;
 
@@ -88,7 +87,7 @@ public class QueryInProgress extends CompositeService {
 
   @Override
   public void init(Configuration conf) {
-    dispatcher = new TajoAsyncDispatcher("QueryInProgress:" + queryId);
+    dispatcher = new AsyncDispatcher();
     this.addService(dispatcher);
 
     dispatcher.register(QueryJobEvent.Type.class, new QueryInProgressEventHandler());
@@ -193,8 +192,6 @@ public class QueryInProgress extends CompositeService {
             new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, queryInProgress.getQueryInfo()));
       } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_START) {
         submmitQueryToMaster();
-      } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_FINISH) {
-        stop();
       } else if (queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
         kill();
       }
@@ -255,7 +252,7 @@ public class QueryInProgress extends CompositeService {
   }
 
   public boolean isStarted() {
-    return this.querySubmitted.get();
+    return !stopped.get() && this.querySubmitted.get();
   }
 
   private void heartbeat(QueryInfo queryInfo) {
@@ -289,7 +286,8 @@ public class QueryInProgress extends CompositeService {
 
 
       if (isFinishState(this.queryInfo.getQueryState())) {
-        stop();
+        masterContext.getQueryJobManager().getEventHandler().handle(
+            new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_STOP, this.queryInfo));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
index 811de1b..ce30ec7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
@@ -37,6 +37,7 @@ public class QueryJobEvent extends AbstractEvent<QueryJobEvent.Type> {
     QUERY_JOB_START,
     QUERY_JOB_HEARTBEAT,
     QUERY_JOB_FINISH,
+    QUERY_JOB_STOP,
     QUERY_MASTER_START,
     QUERY_MASTER_STOP,
     QUERY_JOB_KILL

http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index 34a0d01..13f6456 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -40,7 +40,6 @@ import org.apache.tajo.scheduler.SimpleFifoScheduler;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -187,15 +186,16 @@ public class QueryJobManager extends CompositeService {
         LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]");
         return;
       }
-      if(queryInProgress.isStarted()){
+
+      if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) {
+        stopQuery(event.getQueryInfo().getQueryId());
+      } else if (queryInProgress.isStarted()) {
         queryInProgress.getEventHandler().handle(event);
-      } else {
-        if(event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL){
-          scheduler.removeQuery(queryInProgress.getQueryId());
-          queryInProgress.getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
+      } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
+        scheduler.removeQuery(queryInProgress.getQueryId());
+        queryInProgress.getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
 
-          stopQuery(queryInProgress.getQueryId());
-        }
+        stopQuery(queryInProgress.getQueryId());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 7623026..641de78 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
@@ -35,8 +36,8 @@ import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.TajoAsyncDispatcher;
 import org.apache.tajo.master.event.QueryStartEvent;
+import org.apache.tajo.master.event.QueryStopEvent;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
@@ -66,7 +67,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
   private Clock clock;
 
-  private TajoAsyncDispatcher dispatcher;
+  private AsyncDispatcher dispatcher;
 
   private GlobalPlanner globalPlanner;
 
@@ -110,12 +111,13 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
       clock = new SystemClock();
 
-      this.dispatcher = new TajoAsyncDispatcher("querymaster_" + System.currentTimeMillis());
+      this.dispatcher = new AsyncDispatcher();
       addIfService(dispatcher);
 
       globalPlanner = new GlobalPlanner(systemConf, workerContext);
 
       dispatcher.register(QueryStartEvent.EventType.class, new QueryStartEventHandler());
+      dispatcher.register(QueryStopEvent.EventType.class, new QueryStopEventHandler());
 
     } catch (Throwable t) {
       LOG.error(t.getMessage(), t);
@@ -360,7 +362,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
       return eventExecutor;
     }
 
-    public TajoAsyncDispatcher getDispatcher() {
+    public AsyncDispatcher getDispatcher() {
       return dispatcher;
     }
 
@@ -491,6 +493,13 @@ public class QueryMaster extends CompositeService implements EventHandler {
     }
   }
 
+  private class QueryStopEventHandler implements EventHandler<QueryStopEvent> {
+    @Override
+    public void handle(QueryStopEvent event) {
+      queryMasterContext.stopQuery(event.getQueryId());
+    }
+  }
+
   class QueryHeartbeatThread extends Thread {
     public QueryHeartbeatThread() {
       super("QueryHeartbeatThread");

http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 720d60a..9c789a5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tajo.*;
@@ -37,33 +38,31 @@ import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.plan.LogicalOptimizer;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.LogicalPlanner;
-import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
-import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.plan.logical.NodeType;
-import org.apache.tajo.plan.logical.ScanNode;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.exception.UnimplementedException;
+import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.TajoAsyncDispatcher;
 import org.apache.tajo.master.TajoContainerProxy;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.master.session.Session;
+import org.apache.tajo.plan.LogicalOptimizer;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
+import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.verifier.VerifyException;
-import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.StorageProperty;
 import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.util.metrics.TajoMetrics;
 import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter;
 import org.apache.tajo.worker.AbstractResourceAllocator;
@@ -71,7 +70,6 @@ import org.apache.tajo.worker.TajoResourceAllocator;
 
 import java.io.IOException;
 import java.util.*;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -104,7 +102,7 @@ public class QueryMasterTask extends CompositeService {
 
   private String logicalPlanJson;
 
-  private TajoAsyncDispatcher dispatcher;
+  private AsyncDispatcher dispatcher;
 
   private final long querySubmitTime;
 
@@ -154,7 +152,7 @@ public class QueryMasterTask extends CompositeService {
       }
       addService(resourceAllocator);
 
-      dispatcher = new TajoAsyncDispatcher(queryId.toString());
+      dispatcher = new AsyncDispatcher();
       addService(dispatcher);
 
       dispatcher.register(StageEventType.class, new StageEventDispatcher());
@@ -200,8 +198,6 @@ public class QueryMasterTask extends CompositeService {
       LOG.fatal(t.getMessage(), t);
     }
 
-    CallFuture future = new CallFuture();
-
     RpcConnectionPool connPool = RpcConnectionPool.getPool(queryMasterContext.getConf());
     NettyClientBase tmClient = null;
     try {
@@ -225,21 +221,12 @@ public class QueryMasterTask extends CompositeService {
         tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
             TajoMasterProtocol.class, true);
       }
-
-      TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
-      masterClientService.stopQueryMaster(null, queryId.getProto(), future);
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
     } finally {
       connPool.releaseConnection(tmClient);
     }
 
-    try {
-      future.get(3, TimeUnit.SECONDS);
-    } catch (Throwable t) {
-      LOG.warn(t);
-    }
-
     super.stop();
 
     //TODO change report to tajo master
@@ -339,7 +326,8 @@ public class QueryMasterTask extends CompositeService {
         }
       }
       LOG.info("Query final state: " + query.getSynchronizedState());
-      queryMasterContext.stopQuery(queryId);
+
+      queryMasterContext.getEventHandler().handle(new QueryStopEvent(queryId));
     }
   }
 
@@ -620,7 +608,7 @@ public class QueryMasterTask extends CompositeService {
       return eventHandler;
     }
 
-    public TajoAsyncDispatcher getDispatcher() {
+    public AsyncDispatcher getDispatcher() {
       return dispatcher;
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java
index e421417..0515e72 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java
@@ -736,16 +736,16 @@ public class Stage implements EventHandler<StageEvent> {
           stage.finalizeStats();
           state = StageState.SUCCEEDED;
         } else {
+          ExecutionBlock parent = stage.getMasterPlan().getParent(stage.getBlock());
+          DataChannel channel = stage.getMasterPlan().getChannel(stage.getId(), parent.getId());
+          setShuffleIfNecessary(stage, channel);
+          initTaskScheduler(stage);
           // execute pre-processing asyncronously
           stage.getContext().getQueryMasterContext().getEventExecutor()
               .submit(new Runnable() {
                         @Override
                         public void run() {
                           try {
-                            ExecutionBlock parent = stage.getMasterPlan().getParent(stage.getBlock());
-                            DataChannel channel = stage.getMasterPlan().getChannel(stage.getId(), parent.getId());
-                            setShuffleIfNecessary(stage, channel);
-                            initTaskScheduler(stage);
                             schedule(stage);
                             stage.totalScheduledObjectsCount = stage.getTaskScheduler().remainingScheduledObjectNum();
                             LOG.info(stage.totalScheduledObjectsCount + " objects are scheduled");

http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
index 875e450..1605560 100644
--- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
@@ -201,6 +201,12 @@ public class QueryTestCaseBase {
     client.close();
   }
 
+  @Before
+  public void printTestName() {
+    /* protect a travis stalled build */
+    System.out.println("Run: " + name.getMethodName());
+  }
+
   public QueryTestCaseBase() {
     // hive 0.12 does not support quoted identifier.
     // So, we use lower case database names when Tajo uses HCatalogStore.

http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 841be45..0d2f6fa 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -164,8 +164,6 @@ public class TajoTestingCluster {
     if (!StringUtils.isEmpty(LOG_LEVEL)) {
       Level defaultLevel = Logger.getRootLogger().getLevel();
       Logger.getLogger("org.apache.tajo").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel));
-      Logger.getLogger("org.apache.tajo.master.TajoAsyncDispatcher").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(),
-        defaultLevel));
       Logger.getLogger("org.apache.hadoop").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel));
       Logger.getLogger("org.apache.zookeeper").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel));
       Logger.getLogger("BlockStateChange").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel));
@@ -630,8 +628,10 @@ public class TajoTestingCluster {
       this.clusterTestBuildDir = null;
     }
 
-    hbaseUtil.stopZooKeeperCluster();
-    hbaseUtil.stopHBaseCluster();
+    if(hbaseUtil != null) {
+      hbaseUtil.stopZooKeeperCluster();
+      hbaseUtil.stopHBaseCluster();
+    }
 
     LOG.info("Minicluster is down");
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/test/java/org/apache/tajo/TpchTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TpchTestBase.java b/tajo-core/src/test/java/org/apache/tajo/TpchTestBase.java
index 0f713e5..055dd02 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TpchTestBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TpchTestBase.java
@@ -22,10 +22,10 @@ import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.benchmark.TPCH;
-import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.KeyValueSet;
 
 import java.io.File;
 import java.io.IOException;
@@ -73,11 +73,7 @@ public class TpchTestBase {
     tables = new String[names.length][];
     File file;
     for (int i = 0; i < names.length; i++) {
-      file = new File("src/test/tpch/" + names[i] + ".tbl");
-      if(!file.exists()) {
-        file = new File(System.getProperty("user.dir") + "/tajo-core/src/test/tpch/" + names[i]
-            + ".tbl");
-      }
+      file = TPCH.getDataFile(names[i]);
       tables[i] = FileUtil.readTextFile(file).split("\n");
       paths[i] = file.getAbsolutePath();
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
index b14bfa9..aff1677 100644
--- a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
+++ b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
@@ -321,14 +321,14 @@ public class TestTajoCli {
     setVar(tajoCli, SessionVars.CLI_FORMATTER_CLASS, TajoCliOutputTestFormatter.class.getName());
 
     ByteArrayOutputStream out = new ByteArrayOutputStream();
-    tajoCli = new TajoCli(tajoConf, new String[]{}, System.in, out);
+    TajoCli tajoCli = new TajoCli(tajoConf, new String[]{}, System.in, out);
     tajoCli.executeMetaCommand("\\admin -showmasters");
 
     String consoleResult = new String(out.toByteArray());
 
     String masterAddress = tajoCli.getContext().getConf().getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
     String host = masterAddress.split(":")[0];
-
+    tajoCli.close();
     assertEquals(consoleResult, host + "\n");
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
index 99baeba..1c763e2 100644
--- a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
+++ b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
@@ -26,7 +26,6 @@ import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.client.QueryClient;
-import org.apache.tajo.conf.TajoConf;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -34,11 +33,13 @@ import org.junit.experimental.categories.Category;
 
 import java.net.InetSocketAddress;
 import java.sql.*;
-import java.util.*;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
 import static org.junit.Assert.*;
-import static org.junit.Assert.assertEquals;
 
 @Category(IntegrationTest.class)
 public class TestTajoJdbc extends QueryTestCaseBase {
@@ -113,6 +114,9 @@ public class TestTajoJdbc extends QueryTestCaseBase {
       if (stmt != null) {
         stmt.close();
       }
+      if (conn != null) {
+        conn.close();
+      }
     }
   }
 
@@ -194,6 +198,9 @@ public class TestTajoJdbc extends QueryTestCaseBase {
       if (stmt != null) {
         stmt.close();
       }
+      if (conn != null) {
+        conn.close();
+      }
     }
   }
 
@@ -494,11 +501,11 @@ public class TestTajoJdbc extends QueryTestCaseBase {
     int result;
     Statement stmt = null;
     ResultSet res = null;
-
+    Connection conn = null;
     try {
       String connUri = buildConnectionUri(tajoMasterAddress.getHostName(), tajoMasterAddress.getPort(),
         DEFAULT_DATABASE_NAME);
-      Connection conn = DriverManager.getConnection(connUri);
+      conn = DriverManager.getConnection(connUri);
       assertTrue(conn.isValid(100));
 
       stmt = conn.createStatement();
@@ -532,6 +539,10 @@ public class TestTajoJdbc extends QueryTestCaseBase {
       if (stmt != null) {
         stmt.close();
       }
+
+      if(conn != null) {
+        conn.close();
+      }
     }
   }
 
@@ -539,11 +550,11 @@ public class TestTajoJdbc extends QueryTestCaseBase {
   public void testSortWithDateTime() throws Exception {
     Statement stmt = null;
     ResultSet res = null;
+    Connection conn = null;
     int result;
 
     // skip this test if catalog uses HCatalogStore.
     // It is because HCatalogStore does not support Time data type.
-
     try {
       if (!testingCluster.isHCatalogStoreRunning()) {
         executeDDL("create_table_with_date_ddl.sql", "table1");
@@ -551,7 +562,7 @@ public class TestTajoJdbc extends QueryTestCaseBase {
         String connUri = buildConnectionUri(tajoMasterAddress.getHostName(),
           tajoMasterAddress.getPort(), "TestTajoJdbc");
 
-        Connection conn = DriverManager.getConnection(connUri);
+        conn = DriverManager.getConnection(connUri);
         assertTrue(conn.isValid(100));
 
         stmt = conn.createStatement();
@@ -576,6 +587,10 @@ public class TestTajoJdbc extends QueryTestCaseBase {
       if (stmt != null) {
         stmt.close();
       }
+
+      if(conn != null) {
+        conn.close();
+      }
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java
index c1f4178..8ca4cff 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java
@@ -20,34 +20,51 @@ package org.apache.tajo.master.querymaster;
 
 import org.apache.tajo.*;
 import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.benchmark.TPCH;
 import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.TajoClientImpl;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.plan.LogicalOptimizer;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.LogicalPlanner;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.master.event.QueryEvent;
 import org.apache.tajo.master.event.QueryEventType;
 import org.apache.tajo.master.session.Session;
+import org.apache.tajo.plan.LogicalOptimizer;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.Assert.*;
 
-@Category(IntegrationTest.class)
 public class TestKillQuery {
   private static TajoTestingCluster cluster;
   private static TajoConf conf;
+  private static TajoClient client;
 
   @BeforeClass
   public static void setUp() throws Exception {
-    cluster = TpchTestBase.getInstance().getTestingCluster();
+    cluster = new TajoTestingCluster();
+    cluster.startMiniClusterInLocal(1);
     conf = cluster.getConfiguration();
+    client = new TajoClientImpl(cluster.getConfiguration());
+    File file = TPCH.getDataFile("lineitem");
+    client.executeQueryAndGetResult("create external table default.lineitem (l_orderkey int, l_partkey int) "
+        + "using text location 'file://" + file.getAbsolutePath() + "'");
+    assertTrue(client.existTable("default.lineitem"));
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    if (client != null) client.close();
+    if (cluster != null) cluster.shutdownMiniCluster();
   }
 
   @Test
@@ -56,7 +73,7 @@ public class TestKillQuery {
     QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
     Session session = LocalTajoTestingUtility.createDummySession();
     CatalogService catalog = cluster.getMaster().getCatalog();
-    String query = "select l_orderkey from lineitem group by l_orderkey";
+    String query = "select l_orderkey, l_partkey from lineitem group by l_orderkey, l_partkey order by l_orderkey";
 
     LogicalPlanner planner = new LogicalPlanner(catalog);
     LogicalOptimizer optimizer = new LogicalOptimizer(conf);
@@ -99,7 +116,7 @@ public class TestKillQuery {
     q.handle(new QueryEvent(queryId, QueryEventType.KILL));
 
     try{
-      cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 10);
+      cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50);
     } finally {
       assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState());
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java b/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java
index 18764c2..acd6b71 100644
--- a/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java
+++ b/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java
@@ -18,7 +18,10 @@
 
 package org.apache.tajo.scheduler;
 
-import org.apache.tajo.*;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.benchmark.TPCH;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.client.TajoClientImpl;
 import org.apache.tajo.client.TajoClientUtil;
@@ -27,47 +30,52 @@ import org.apache.tajo.ipc.ClientProtos;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
 
+import java.io.File;
 import java.sql.ResultSet;
 
 import static org.junit.Assert.*;
 
-@Category(IntegrationTest.class)
 public class TestFifoScheduler {
   private static TajoTestingCluster cluster;
   private static TajoConf conf;
   private static TajoClient client;
+  private static String query =
+      "select l_orderkey, l_partkey from lineitem group by l_orderkey, l_partkey order by l_orderkey";
 
   @BeforeClass
   public static void setUp() throws Exception {
-    cluster = TpchTestBase.getInstance().getTestingCluster();
+    cluster = new TajoTestingCluster();
+    cluster.startMiniClusterInLocal(1);
     conf = cluster.getConfiguration();
-    client = new TajoClientImpl(conf);
+    client = new TajoClientImpl(cluster.getConfiguration());
+    File file = TPCH.getDataFile("lineitem");
+    client.executeQueryAndGetResult("create external table default.lineitem (l_orderkey int, l_partkey int) "
+        + "using text location 'file://" + file.getAbsolutePath() + "'");
+    assertTrue(client.existTable("default.lineitem"));
   }
 
   @AfterClass
   public static void tearDown() throws Exception {
-    client.close();
+    if (client != null) client.close();
+    if (cluster != null) cluster.shutdownMiniCluster();
   }
 
   @Test
   public final void testKillScheduledQuery() throws Exception {
-    ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(1) from lineitem");
-    ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select sleep(1) from lineitem");
+    ClientProtos.SubmitQueryResponse res = client.executeQuery(query);
+    ClientProtos.SubmitQueryResponse res2 = client.executeQuery(query);
     QueryId queryId = new QueryId(res.getQueryId());
     QueryId queryId2 = new QueryId(res2.getQueryId());
 
     cluster.waitForQueryRunning(queryId);
     client.killQuery(queryId2);
     assertEquals(TajoProtos.QueryState.QUERY_KILLED, client.getQueryStatus(queryId2).getState());
-
-    client.killQuery(queryId); // cleanup
   }
 
   @Test
   public final void testForwardedQuery() throws Exception {
-    ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(1) from lineitem");
+    ClientProtos.SubmitQueryResponse res = client.executeQuery(query);
     ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select * from lineitem limit 1");
     assertTrue(res.getIsForwarded());
     assertFalse(res2.getIsForwarded());
@@ -79,16 +87,14 @@ public class TestFifoScheduler {
     assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, client.getQueryStatus(queryId2).getState());
     ResultSet resSet = TajoClientUtil.createResultSet(conf, client, res2);
     assertNotNull(resSet);
-
-    client.killQuery(queryId); //cleanup
   }
 
   @Test
   public final void testScheduledQuery() throws Exception {
     ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(1) from lineitem");
-    ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select sleep(1) from lineitem");
-    ClientProtos.SubmitQueryResponse res3 = client.executeQuery("select sleep(1) from lineitem");
-    ClientProtos.SubmitQueryResponse res4 = client.executeQuery("select sleep(1) from lineitem");
+    ClientProtos.SubmitQueryResponse res2 = client.executeQuery(query);
+    ClientProtos.SubmitQueryResponse res3 = client.executeQuery(query);
+    ClientProtos.SubmitQueryResponse res4 = client.executeQuery(query);
 
     QueryId queryId = new QueryId(res.getQueryId());
     QueryId queryId2 = new QueryId(res2.getQueryId());
@@ -103,9 +109,8 @@ public class TestFifoScheduler {
     assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId3).getState());
     assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId4).getState());
 
-    client.killQuery(queryId2);
-    client.killQuery(queryId3);
     client.killQuery(queryId4);
-    client.killQuery(queryId);
+    client.killQuery(queryId3);
+    client.killQuery(queryId2);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
index d320077..77aa1d4 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
@@ -20,19 +20,20 @@ package org.apache.tajo.worker;
 
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.service.Service;
-import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.TpchTestBase;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.benchmark.TPCH;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.client.TajoClientImpl;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.querymaster.QueryInfo;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
@@ -40,29 +41,34 @@ import java.util.Map;
 import static org.junit.Assert.*;
 
 public class TestHistory {
-  private TajoTestingCluster cluster;
-  private TajoMaster master;
-  private TajoConf conf;
-  private TajoClient client;
-
-  @Before
-  public void setUp() throws Exception {
-    cluster = TpchTestBase.getInstance().getTestingCluster();
+  private static TajoTestingCluster cluster;
+  private static TajoMaster master;
+  private static  TajoConf conf;
+  private static TajoClient client;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    cluster = new TajoTestingCluster();
+    cluster.startMiniClusterInLocal(1);
     master = cluster.getMaster();
     conf = cluster.getConfiguration();
-    client = new TajoClientImpl(conf);
+    client = new TajoClientImpl(cluster.getConfiguration());
+    File file = TPCH.getDataFile("lineitem");
+    client.executeQueryAndGetResult("create external table default.lineitem (l_orderkey int, l_partkey int) "
+        + "using text location 'file://" + file.getAbsolutePath() + "'");
+    assertTrue(client.existTable("default.lineitem"));
   }
 
-  @After
-  public void tearDown() {
-    client.close();
+  @AfterClass
+  public static void tearDown() throws IOException {
+    if (client != null) client.close();
+    if (cluster != null) cluster.shutdownMiniCluster();
   }
 
-
   @Test
   public final void testTaskRunnerHistory() throws IOException, ServiceException, InterruptedException {
     int beforeFinishedQueriesCount = master.getContext().getQueryJobManager().getFinishedQueries().size();
-    client.executeQueryAndGetResult("select sleep(1) from lineitem");
+    client.executeQueryAndGetResult("select count(*) from lineitem");
 
     Collection<QueryInfo> finishedQueries = master.getContext().getQueryJobManager().getFinishedQueries();
     assertTrue(finishedQueries.size() > beforeFinishedQueriesCount);
@@ -89,7 +95,7 @@ public class TestHistory {
   @Test
   public final void testTaskHistory() throws IOException, ServiceException, InterruptedException {
     int beforeFinishedQueriesCount = master.getContext().getQueryJobManager().getFinishedQueries().size();
-    client.executeQueryAndGetResult("select sleep(1) from lineitem");
+    client.executeQueryAndGetResult("select count(*) from lineitem");
 
     Collection<QueryInfo> finishedQueries = master.getContext().getQueryJobManager().getFinishedQueries();
     assertTrue(finishedQueries.size() > beforeFinishedQueriesCount);