You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/01/08 08:19:39 UTC
tajo git commit: TAJO-1279: Cleanup TajoAsyncDispatcher and interrupt
stop events. (jinho)
Repository: tajo
Updated Branches:
refs/heads/master 7615b7576 -> 6582d8656
TAJO-1279: Cleanup TajoAsyncDispatcher and interrupt stop events. (jinho)
Closes #331
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/6582d865
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/6582d865
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/6582d865
Branch: refs/heads/master
Commit: 6582d8656a3e8147984a86a9e62688b52ec1f681
Parents: 7615b75
Author: jhkim <jh...@apache.org>
Authored: Thu Jan 8 16:18:31 2015 +0900
Committer: jhkim <jh...@apache.org>
Committed: Thu Jan 8 16:18:31 2015 +0900
----------------------------------------------------------------------
CHANGES | 3 +
.../java/org/apache/tajo/benchmark/TPCH.java | 22 ++
.../tajo/master/DefaultTaskScheduler.java | 6 +-
.../apache/tajo/master/TajoAsyncDispatcher.java | 232 -------------------
.../tajo/master/event/QueryStopEvent.java | 47 ++++
.../master/querymaster/QueryInProgress.java | 14 +-
.../tajo/master/querymaster/QueryJobEvent.java | 1 +
.../master/querymaster/QueryJobManager.java | 16 +-
.../tajo/master/querymaster/QueryMaster.java | 17 +-
.../master/querymaster/QueryMasterTask.java | 44 ++--
.../apache/tajo/master/querymaster/Stage.java | 8 +-
.../java/org/apache/tajo/QueryTestCaseBase.java | 6 +
.../org/apache/tajo/TajoTestingCluster.java | 8 +-
.../test/java/org/apache/tajo/TpchTestBase.java | 8 +-
.../org/apache/tajo/cli/tsql/TestTajoCli.java | 4 +-
.../java/org/apache/tajo/jdbc/TestTajoJdbc.java | 29 ++-
.../tajo/master/querymaster/TestKillQuery.java | 37 ++-
.../tajo/scheduler/TestFifoScheduler.java | 43 ++--
.../org/apache/tajo/worker/TestHistory.java | 44 ++--
19 files changed, 236 insertions(+), 353 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 9ca4579..fe777a7 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,9 @@ Release 0.9.1 - unreleased
IMPROVEMENT
+ TAJO-1279: Cleanup TajoAsyncDispatcher and interrupt stop events.
+ (jinho)
+
TAJO-1285: Refactoring Magic Number to HAConstants.
(DaeMyung Kang via jaehwa)
http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java b/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
index f4b4d6a..e2ea25c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
+++ b/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
@@ -18,6 +18,7 @@
package org.apache.tajo.benchmark;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
@@ -33,8 +34,10 @@ import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.storage.StorageConstants;
+import java.io.File;
import java.io.IOException;
import java.sql.SQLException;
+import java.util.List;
import java.util.Map;
public class TPCH extends BenchmarkSet {
@@ -225,4 +228,23 @@ public class TPCH extends BenchmarkSet {
throw new ServiceException(s);
}
}
+
+ public static List<String> getDataFilePaths(String... tables) {
+ List<String> tablePaths = Lists.newArrayList();
+ File file;
+ for (String table : tables) {
+ file = getDataFile(table);
+ tablePaths.add(file.getAbsolutePath());
+ }
+ return tablePaths;
+ }
+
+ public static File getDataFile(String table) {
+ File file = new File("src/test/tpch/" + table + ".tbl");
+ if (!file.exists()) {
+ file = new File(System.getProperty("user.dir") + "/tajo-core/src/test/tpch/" + table
+ + ".tbl");
+ }
+ return file;
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index 1cd6587..d47c93a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -145,8 +145,10 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
}
// Return all of request callbacks instantly.
- for (TaskRequestEvent req : taskRequests.taskRequestQueue) {
- req.getCallback().run(stopTaskRunnerReq);
+ if(taskRequests != null){
+ for (TaskRequestEvent req : taskRequests.taskRequestQueue) {
+ req.getCallback().run(stopTaskRunnerReq);
+ }
}
LOG.info("Task Scheduler stopped");
http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
deleted file mode 100644
index 751b21b..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.util.ShutdownHookManager;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-public class TajoAsyncDispatcher extends AbstractService implements Dispatcher {
-
- private static final Log LOG = LogFactory.getLog(TajoAsyncDispatcher.class);
-
- private final BlockingQueue<Event> eventQueue;
- private volatile boolean stopped = false;
-
- private Thread eventHandlingThread;
- protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
- private boolean exitOnDispatchException;
-
- private String id;
-
- public TajoAsyncDispatcher(String id) {
- this(id, new LinkedBlockingQueue<Event>());
- }
-
- public TajoAsyncDispatcher(String id, BlockingQueue<Event> eventQueue) {
- super(TajoAsyncDispatcher.class.getName());
- this.id = id;
- this.eventQueue = eventQueue;
- this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
- }
-
- Runnable createThread() {
- return new Runnable() {
- @Override
- public void run() {
- while (!stopped && !Thread.currentThread().isInterrupted()) {
- Event event;
- try {
- event = eventQueue.take();
- if(LOG.isDebugEnabled()) {
- LOG.debug(id + ",event take:" + event.getType() + "," + event);
- }
- } catch(InterruptedException ie) {
- if (!stopped) {
- LOG.warn("AsyncDispatcher thread interrupted");
- }
- return;
- }
- dispatch(event);
- }
- }
- };
- }
-
- @Override
- public synchronized void init(Configuration conf) {
- this.exitOnDispatchException =
- conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
- Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
- super.init(conf);
- }
-
- @Override
- public void start() {
- //start all the components
- super.start();
- eventHandlingThread = new Thread(createThread());
- eventHandlingThread.setName("AsyncDispatcher event handler");
- eventHandlingThread.start();
-
- LOG.info("AsyncDispatcher started:" + id);
- }
-
- @Override
- public synchronized void stop() {
- if(stopped) {
- return;
- }
- stopped = true;
- if (eventHandlingThread != null) {
- eventHandlingThread.interrupt();
- try {
- eventHandlingThread.join();
- } catch (InterruptedException ie) {
- LOG.warn("Interrupted Exception while stopping");
- }
- }
-
- // stop all the components
- super.stop();
-
- LOG.info("AsyncDispatcher stopped:" + id);
- }
-
- @SuppressWarnings("unchecked")
- protected void dispatch(Event event) {
- //all events go thru this loop
- if (LOG.isDebugEnabled()) {
- LOG.debug("Dispatching the event " + event.getClass().getName() + "."
- + event.toString());
- }
- Class<? extends Enum> type = event.getType().getDeclaringClass();
-
- try{
- EventHandler handler = eventDispatchers.get(type);
- if(handler != null) {
- handler.handle(event);
- } else {
- throw new Exception("No handler for registered for " + type);
- }
- } catch (Throwable t) {
- //TODO Maybe log the state of the queue
- LOG.fatal("Error in dispatcher thread:" + event.getType(), t);
- if (exitOnDispatchException && (ShutdownHookManager.get().isShutdownInProgress()) == false) {
- LOG.info("Exiting, bye..");
- System.exit(-1);
- }
- } finally {
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void register(Class<? extends Enum> eventType,
- EventHandler handler) {
- /* check to see if we have a listener registered */
- EventHandler<Event> registeredHandler = (EventHandler<Event>)
- eventDispatchers.get(eventType);
- LOG.debug("Registering " + eventType + " for " + handler.getClass());
- if (registeredHandler == null) {
- eventDispatchers.put(eventType, handler);
- } else if (!(registeredHandler instanceof MultiListenerHandler)){
- /* for multiple listeners of an event add the multiple listener handler */
- MultiListenerHandler multiHandler = new MultiListenerHandler();
- multiHandler.addHandler(registeredHandler);
- multiHandler.addHandler(handler);
- eventDispatchers.put(eventType, multiHandler);
- } else {
- /* already a multilistener, just add to it */
- MultiListenerHandler multiHandler
- = (MultiListenerHandler) registeredHandler;
- multiHandler.addHandler(handler);
- }
- }
-
- @Override
- public EventHandler getEventHandler() {
- return new GenericEventHandler();
- }
-
- class GenericEventHandler implements EventHandler<Event> {
- public void handle(Event event) {
- /* all this method does is enqueue all the events onto the queue */
- int qSize = eventQueue.size();
- if (qSize !=0 && qSize %1000 == 0) {
- LOG.info("Size of event-queue is " + qSize);
- }
- int remCapacity = eventQueue.remainingCapacity();
- if (remCapacity < 1000) {
- LOG.warn("Very low remaining capacity in the event-queue: "
- + remCapacity);
- }
- try {
- if(LOG.isDebugEnabled()) {
- LOG.debug(id + ",add event:" +
- event.getType() + "," + event + "," +
- (eventHandlingThread == null ? "null" : eventHandlingThread.isAlive()));
- }
- eventQueue.put(event);
- } catch (InterruptedException e) {
- if (!stopped) {
- LOG.warn("AsyncDispatcher thread interrupted", e);
- }
- throw new YarnRuntimeException(e);
- }
- }
- }
-
- /**
- * Multiplexing an event. Sending it to different handlers that
- * are interested in the event.
- */
- static class MultiListenerHandler implements EventHandler<Event> {
- List<EventHandler<Event>> listofHandlers;
-
- public MultiListenerHandler() {
- listofHandlers = new ArrayList<EventHandler<Event>>();
- }
-
- @Override
- public void handle(Event event) {
- for (EventHandler<Event> handler: listofHandlers) {
- handler.handle(event);
- }
- }
-
- void addHandler(EventHandler<Event> handler) {
- listofHandlers.add(handler);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStopEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStopEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStopEvent.java
new file mode 100644
index 0000000..6d57d4a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStopEvent.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.QueryId;
+
+/**
+ * This event is conveyed to QueryMaster.
+ */
+public class QueryStopEvent extends AbstractEvent {
+ public enum EventType {
+ QUERY_STOP
+ }
+
+ private final QueryId queryId;
+
+ public QueryStopEvent(QueryId queryId) {
+ super(EventType.QUERY_STOP);
+ this.queryId = queryId;
+ }
+
+ public QueryId getQueryId() {
+ return queryId;
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getName() + "," + getType() + "," + queryId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index ca0bd72..0a87990 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -23,16 +23,15 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.ContainerProtocol;
import org.apache.tajo.ipc.QueryMasterProtocol;
import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto;
-import org.apache.tajo.master.TajoAsyncDispatcher;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.rm.WorkerResourceManager;
import org.apache.tajo.master.session.Session;
@@ -55,7 +54,7 @@ public class QueryInProgress extends CompositeService {
private Session session;
- private TajoAsyncDispatcher dispatcher;
+ private AsyncDispatcher dispatcher;
private LogicalRootNode plan;
@@ -88,7 +87,7 @@ public class QueryInProgress extends CompositeService {
@Override
public void init(Configuration conf) {
- dispatcher = new TajoAsyncDispatcher("QueryInProgress:" + queryId);
+ dispatcher = new AsyncDispatcher();
this.addService(dispatcher);
dispatcher.register(QueryJobEvent.Type.class, new QueryInProgressEventHandler());
@@ -193,8 +192,6 @@ public class QueryInProgress extends CompositeService {
new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, queryInProgress.getQueryInfo()));
} else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_START) {
submmitQueryToMaster();
- } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_FINISH) {
- stop();
} else if (queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
kill();
}
@@ -255,7 +252,7 @@ public class QueryInProgress extends CompositeService {
}
public boolean isStarted() {
- return this.querySubmitted.get();
+ return !stopped.get() && this.querySubmitted.get();
}
private void heartbeat(QueryInfo queryInfo) {
@@ -289,7 +286,8 @@ public class QueryInProgress extends CompositeService {
if (isFinishState(this.queryInfo.getQueryState())) {
- stop();
+ masterContext.getQueryJobManager().getEventHandler().handle(
+ new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_STOP, this.queryInfo));
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
index 811de1b..ce30ec7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
@@ -37,6 +37,7 @@ public class QueryJobEvent extends AbstractEvent<QueryJobEvent.Type> {
QUERY_JOB_START,
QUERY_JOB_HEARTBEAT,
QUERY_JOB_FINISH,
+ QUERY_JOB_STOP,
QUERY_MASTER_START,
QUERY_MASTER_STOP,
QUERY_JOB_KILL
http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index 34a0d01..13f6456 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -40,7 +40,6 @@ import org.apache.tajo.scheduler.SimpleFifoScheduler;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
@@ -187,15 +186,16 @@ public class QueryJobManager extends CompositeService {
LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]");
return;
}
- if(queryInProgress.isStarted()){
+
+ if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) {
+ stopQuery(event.getQueryInfo().getQueryId());
+ } else if (queryInProgress.isStarted()) {
queryInProgress.getEventHandler().handle(event);
- } else {
- if(event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL){
- scheduler.removeQuery(queryInProgress.getQueryId());
- queryInProgress.getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
+ } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
+ scheduler.removeQuery(queryInProgress.getQueryId());
+ queryInProgress.getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
- stopQuery(queryInProgress.getQueryId());
- }
+ stopQuery(queryInProgress.getQueryId());
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 7623026..641de78 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
@@ -35,8 +36,8 @@ import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ha.HAServiceUtil;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.TajoAsyncDispatcher;
import org.apache.tajo.master.event.QueryStartEvent;
+import org.apache.tajo.master.event.QueryStopEvent;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
@@ -66,7 +67,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
private Clock clock;
- private TajoAsyncDispatcher dispatcher;
+ private AsyncDispatcher dispatcher;
private GlobalPlanner globalPlanner;
@@ -110,12 +111,13 @@ public class QueryMaster extends CompositeService implements EventHandler {
clock = new SystemClock();
- this.dispatcher = new TajoAsyncDispatcher("querymaster_" + System.currentTimeMillis());
+ this.dispatcher = new AsyncDispatcher();
addIfService(dispatcher);
globalPlanner = new GlobalPlanner(systemConf, workerContext);
dispatcher.register(QueryStartEvent.EventType.class, new QueryStartEventHandler());
+ dispatcher.register(QueryStopEvent.EventType.class, new QueryStopEventHandler());
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
@@ -360,7 +362,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
return eventExecutor;
}
- public TajoAsyncDispatcher getDispatcher() {
+ public AsyncDispatcher getDispatcher() {
return dispatcher;
}
@@ -491,6 +493,13 @@ public class QueryMaster extends CompositeService implements EventHandler {
}
}
+ private class QueryStopEventHandler implements EventHandler<QueryStopEvent> {
+ @Override
+ public void handle(QueryStopEvent event) {
+ queryMasterContext.stopQuery(event.getQueryId());
+ }
+ }
+
class QueryHeartbeatThread extends Thread {
public QueryHeartbeatThread() {
super("QueryHeartbeatThread");
http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 720d60a..9c789a5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tajo.*;
@@ -37,33 +38,31 @@ import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.plan.LogicalOptimizer;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.LogicalPlanner;
-import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
-import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.plan.logical.NodeType;
-import org.apache.tajo.plan.logical.ScanNode;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.exception.UnimplementedException;
+import org.apache.tajo.ha.HAServiceUtil;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.TajoAsyncDispatcher;
import org.apache.tajo.master.TajoContainerProxy;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
import org.apache.tajo.master.session.Session;
+import org.apache.tajo.plan.LogicalOptimizer;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
+import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.plan.verifier.VerifyException;
-import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.storage.StorageManager;
import org.apache.tajo.storage.StorageProperty;
import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.ha.HAServiceUtil;
import org.apache.tajo.util.metrics.TajoMetrics;
import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter;
import org.apache.tajo.worker.AbstractResourceAllocator;
@@ -71,7 +70,6 @@ import org.apache.tajo.worker.TajoResourceAllocator;
import java.io.IOException;
import java.util.*;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -104,7 +102,7 @@ public class QueryMasterTask extends CompositeService {
private String logicalPlanJson;
- private TajoAsyncDispatcher dispatcher;
+ private AsyncDispatcher dispatcher;
private final long querySubmitTime;
@@ -154,7 +152,7 @@ public class QueryMasterTask extends CompositeService {
}
addService(resourceAllocator);
- dispatcher = new TajoAsyncDispatcher(queryId.toString());
+ dispatcher = new AsyncDispatcher();
addService(dispatcher);
dispatcher.register(StageEventType.class, new StageEventDispatcher());
@@ -200,8 +198,6 @@ public class QueryMasterTask extends CompositeService {
LOG.fatal(t.getMessage(), t);
}
- CallFuture future = new CallFuture();
-
RpcConnectionPool connPool = RpcConnectionPool.getPool(queryMasterContext.getConf());
NettyClientBase tmClient = null;
try {
@@ -225,21 +221,12 @@ public class QueryMasterTask extends CompositeService {
tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
TajoMasterProtocol.class, true);
}
-
- TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
- masterClientService.stopQueryMaster(null, queryId.getProto(), future);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
} finally {
connPool.releaseConnection(tmClient);
}
- try {
- future.get(3, TimeUnit.SECONDS);
- } catch (Throwable t) {
- LOG.warn(t);
- }
-
super.stop();
//TODO change report to tajo master
@@ -339,7 +326,8 @@ public class QueryMasterTask extends CompositeService {
}
}
LOG.info("Query final state: " + query.getSynchronizedState());
- queryMasterContext.stopQuery(queryId);
+
+ queryMasterContext.getEventHandler().handle(new QueryStopEvent(queryId));
}
}
@@ -620,7 +608,7 @@ public class QueryMasterTask extends CompositeService {
return eventHandler;
}
- public TajoAsyncDispatcher getDispatcher() {
+ public AsyncDispatcher getDispatcher() {
return dispatcher;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java
index e421417..0515e72 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java
@@ -736,16 +736,16 @@ public class Stage implements EventHandler<StageEvent> {
stage.finalizeStats();
state = StageState.SUCCEEDED;
} else {
+ ExecutionBlock parent = stage.getMasterPlan().getParent(stage.getBlock());
+ DataChannel channel = stage.getMasterPlan().getChannel(stage.getId(), parent.getId());
+ setShuffleIfNecessary(stage, channel);
+ initTaskScheduler(stage);
// execute pre-processing asyncronously
stage.getContext().getQueryMasterContext().getEventExecutor()
.submit(new Runnable() {
@Override
public void run() {
try {
- ExecutionBlock parent = stage.getMasterPlan().getParent(stage.getBlock());
- DataChannel channel = stage.getMasterPlan().getChannel(stage.getId(), parent.getId());
- setShuffleIfNecessary(stage, channel);
- initTaskScheduler(stage);
schedule(stage);
stage.totalScheduledObjectsCount = stage.getTaskScheduler().remainingScheduledObjectNum();
LOG.info(stage.totalScheduledObjectsCount + " objects are scheduled");
http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
index 875e450..1605560 100644
--- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
@@ -201,6 +201,12 @@ public class QueryTestCaseBase {
client.close();
}
+ @Before
+ public void printTestName() {
+ /* protect a travis stalled build */
+ System.out.println("Run: " + name.getMethodName());
+ }
+
public QueryTestCaseBase() {
// hive 0.12 does not support quoted identifier.
// So, we use lower case database names when Tajo uses HCatalogStore.
http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 841be45..0d2f6fa 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -164,8 +164,6 @@ public class TajoTestingCluster {
if (!StringUtils.isEmpty(LOG_LEVEL)) {
Level defaultLevel = Logger.getRootLogger().getLevel();
Logger.getLogger("org.apache.tajo").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel));
- Logger.getLogger("org.apache.tajo.master.TajoAsyncDispatcher").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(),
- defaultLevel));
Logger.getLogger("org.apache.hadoop").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel));
Logger.getLogger("org.apache.zookeeper").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel));
Logger.getLogger("BlockStateChange").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel));
@@ -630,8 +628,10 @@ public class TajoTestingCluster {
this.clusterTestBuildDir = null;
}
- hbaseUtil.stopZooKeeperCluster();
- hbaseUtil.stopHBaseCluster();
+ if(hbaseUtil != null) {
+ hbaseUtil.stopZooKeeperCluster();
+ hbaseUtil.stopHBaseCluster();
+ }
LOG.info("Minicluster is down");
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/test/java/org/apache/tajo/TpchTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TpchTestBase.java b/tajo-core/src/test/java/org/apache/tajo/TpchTestBase.java
index 0f713e5..055dd02 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TpchTestBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TpchTestBase.java
@@ -22,10 +22,10 @@ import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.benchmark.TPCH;
-import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.storage.StorageConstants;
import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.KeyValueSet;
import java.io.File;
import java.io.IOException;
@@ -73,11 +73,7 @@ public class TpchTestBase {
tables = new String[names.length][];
File file;
for (int i = 0; i < names.length; i++) {
- file = new File("src/test/tpch/" + names[i] + ".tbl");
- if(!file.exists()) {
- file = new File(System.getProperty("user.dir") + "/tajo-core/src/test/tpch/" + names[i]
- + ".tbl");
- }
+ file = TPCH.getDataFile(names[i]);
tables[i] = FileUtil.readTextFile(file).split("\n");
paths[i] = file.getAbsolutePath();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
index b14bfa9..aff1677 100644
--- a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
+++ b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
@@ -321,14 +321,14 @@ public class TestTajoCli {
setVar(tajoCli, SessionVars.CLI_FORMATTER_CLASS, TajoCliOutputTestFormatter.class.getName());
ByteArrayOutputStream out = new ByteArrayOutputStream();
- tajoCli = new TajoCli(tajoConf, new String[]{}, System.in, out);
+ TajoCli tajoCli = new TajoCli(tajoConf, new String[]{}, System.in, out);
tajoCli.executeMetaCommand("\\admin -showmasters");
String consoleResult = new String(out.toByteArray());
String masterAddress = tajoCli.getContext().getConf().getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
String host = masterAddress.split(":")[0];
-
+ tajoCli.close();
assertEquals(consoleResult, host + "\n");
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
index 99baeba..1c763e2 100644
--- a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
+++ b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
@@ -26,7 +26,6 @@ import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.client.QueryClient;
-import org.apache.tajo.conf.TajoConf;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -34,11 +33,13 @@ import org.junit.experimental.categories.Category;
import java.net.InetSocketAddress;
import java.sql.*;
-import java.util.*;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
import static org.junit.Assert.*;
-import static org.junit.Assert.assertEquals;
@Category(IntegrationTest.class)
public class TestTajoJdbc extends QueryTestCaseBase {
@@ -113,6 +114,9 @@ public class TestTajoJdbc extends QueryTestCaseBase {
if (stmt != null) {
stmt.close();
}
+ if (conn != null) {
+ conn.close();
+ }
}
}
@@ -194,6 +198,9 @@ public class TestTajoJdbc extends QueryTestCaseBase {
if (stmt != null) {
stmt.close();
}
+ if (conn != null) {
+ conn.close();
+ }
}
}
@@ -494,11 +501,11 @@ public class TestTajoJdbc extends QueryTestCaseBase {
int result;
Statement stmt = null;
ResultSet res = null;
-
+ Connection conn = null;
try {
String connUri = buildConnectionUri(tajoMasterAddress.getHostName(), tajoMasterAddress.getPort(),
DEFAULT_DATABASE_NAME);
- Connection conn = DriverManager.getConnection(connUri);
+ conn = DriverManager.getConnection(connUri);
assertTrue(conn.isValid(100));
stmt = conn.createStatement();
@@ -532,6 +539,10 @@ public class TestTajoJdbc extends QueryTestCaseBase {
if (stmt != null) {
stmt.close();
}
+
+ if(conn != null) {
+ conn.close();
+ }
}
}
@@ -539,11 +550,11 @@ public class TestTajoJdbc extends QueryTestCaseBase {
public void testSortWithDateTime() throws Exception {
Statement stmt = null;
ResultSet res = null;
+ Connection conn = null;
int result;
// skip this test if catalog uses HCatalogStore.
// It is because HCatalogStore does not support Time data type.
-
try {
if (!testingCluster.isHCatalogStoreRunning()) {
executeDDL("create_table_with_date_ddl.sql", "table1");
@@ -551,7 +562,7 @@ public class TestTajoJdbc extends QueryTestCaseBase {
String connUri = buildConnectionUri(tajoMasterAddress.getHostName(),
tajoMasterAddress.getPort(), "TestTajoJdbc");
- Connection conn = DriverManager.getConnection(connUri);
+ conn = DriverManager.getConnection(connUri);
assertTrue(conn.isValid(100));
stmt = conn.createStatement();
@@ -576,6 +587,10 @@ public class TestTajoJdbc extends QueryTestCaseBase {
if (stmt != null) {
stmt.close();
}
+
+ if(conn != null) {
+ conn.close();
+ }
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java
index c1f4178..8ca4cff 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java
@@ -20,34 +20,51 @@ package org.apache.tajo.master.querymaster;
import org.apache.tajo.*;
import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.benchmark.TPCH;
import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.TajoClientImpl;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.plan.LogicalOptimizer;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.LogicalPlanner;
import org.apache.tajo.engine.planner.global.GlobalPlanner;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.master.event.QueryEvent;
import org.apache.tajo.master.event.QueryEventType;
import org.apache.tajo.master.session.Session;
+import org.apache.tajo.plan.LogicalOptimizer;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.Assert.*;
-@Category(IntegrationTest.class)
public class TestKillQuery {
private static TajoTestingCluster cluster;
private static TajoConf conf;
+ private static TajoClient client;
@BeforeClass
public static void setUp() throws Exception {
- cluster = TpchTestBase.getInstance().getTestingCluster();
+ cluster = new TajoTestingCluster();
+ cluster.startMiniClusterInLocal(1);
conf = cluster.getConfiguration();
+ client = new TajoClientImpl(cluster.getConfiguration());
+ File file = TPCH.getDataFile("lineitem");
+ client.executeQueryAndGetResult("create external table default.lineitem (l_orderkey int, l_partkey int) "
+ + "using text location 'file://" + file.getAbsolutePath() + "'");
+ assertTrue(client.existTable("default.lineitem"));
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ if (client != null) client.close();
+ if (cluster != null) cluster.shutdownMiniCluster();
}
@Test
@@ -56,7 +73,7 @@ public class TestKillQuery {
QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
Session session = LocalTajoTestingUtility.createDummySession();
CatalogService catalog = cluster.getMaster().getCatalog();
- String query = "select l_orderkey from lineitem group by l_orderkey";
+ String query = "select l_orderkey, l_partkey from lineitem group by l_orderkey, l_partkey order by l_orderkey";
LogicalPlanner planner = new LogicalPlanner(catalog);
LogicalOptimizer optimizer = new LogicalOptimizer(conf);
@@ -99,7 +116,7 @@ public class TestKillQuery {
q.handle(new QueryEvent(queryId, QueryEventType.KILL));
try{
- cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 10);
+ cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50);
} finally {
assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState());
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java b/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java
index 18764c2..acd6b71 100644
--- a/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java
+++ b/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java
@@ -18,7 +18,10 @@
package org.apache.tajo.scheduler;
-import org.apache.tajo.*;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.benchmark.TPCH;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.client.TajoClientImpl;
import org.apache.tajo.client.TajoClientUtil;
@@ -27,47 +30,52 @@ import org.apache.tajo.ipc.ClientProtos;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.junit.experimental.categories.Category;
+import java.io.File;
import java.sql.ResultSet;
import static org.junit.Assert.*;
-@Category(IntegrationTest.class)
public class TestFifoScheduler {
private static TajoTestingCluster cluster;
private static TajoConf conf;
private static TajoClient client;
+ private static String query =
+ "select l_orderkey, l_partkey from lineitem group by l_orderkey, l_partkey order by l_orderkey";
@BeforeClass
public static void setUp() throws Exception {
- cluster = TpchTestBase.getInstance().getTestingCluster();
+ cluster = new TajoTestingCluster();
+ cluster.startMiniClusterInLocal(1);
conf = cluster.getConfiguration();
- client = new TajoClientImpl(conf);
+ client = new TajoClientImpl(cluster.getConfiguration());
+ File file = TPCH.getDataFile("lineitem");
+ client.executeQueryAndGetResult("create external table default.lineitem (l_orderkey int, l_partkey int) "
+ + "using text location 'file://" + file.getAbsolutePath() + "'");
+ assertTrue(client.existTable("default.lineitem"));
}
@AfterClass
public static void tearDown() throws Exception {
- client.close();
+ if (client != null) client.close();
+ if (cluster != null) cluster.shutdownMiniCluster();
}
@Test
public final void testKillScheduledQuery() throws Exception {
- ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(1) from lineitem");
- ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select sleep(1) from lineitem");
+ ClientProtos.SubmitQueryResponse res = client.executeQuery(query);
+ ClientProtos.SubmitQueryResponse res2 = client.executeQuery(query);
QueryId queryId = new QueryId(res.getQueryId());
QueryId queryId2 = new QueryId(res2.getQueryId());
cluster.waitForQueryRunning(queryId);
client.killQuery(queryId2);
assertEquals(TajoProtos.QueryState.QUERY_KILLED, client.getQueryStatus(queryId2).getState());
-
- client.killQuery(queryId); // cleanup
}
@Test
public final void testForwardedQuery() throws Exception {
- ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(1) from lineitem");
+ ClientProtos.SubmitQueryResponse res = client.executeQuery(query);
ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select * from lineitem limit 1");
assertTrue(res.getIsForwarded());
assertFalse(res2.getIsForwarded());
@@ -79,16 +87,14 @@ public class TestFifoScheduler {
assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, client.getQueryStatus(queryId2).getState());
ResultSet resSet = TajoClientUtil.createResultSet(conf, client, res2);
assertNotNull(resSet);
-
- client.killQuery(queryId); //cleanup
}
@Test
public final void testScheduledQuery() throws Exception {
ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(1) from lineitem");
- ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select sleep(1) from lineitem");
- ClientProtos.SubmitQueryResponse res3 = client.executeQuery("select sleep(1) from lineitem");
- ClientProtos.SubmitQueryResponse res4 = client.executeQuery("select sleep(1) from lineitem");
+ ClientProtos.SubmitQueryResponse res2 = client.executeQuery(query);
+ ClientProtos.SubmitQueryResponse res3 = client.executeQuery(query);
+ ClientProtos.SubmitQueryResponse res4 = client.executeQuery(query);
QueryId queryId = new QueryId(res.getQueryId());
QueryId queryId2 = new QueryId(res2.getQueryId());
@@ -103,9 +109,8 @@ public class TestFifoScheduler {
assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId3).getState());
assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId4).getState());
- client.killQuery(queryId2);
- client.killQuery(queryId3);
client.killQuery(queryId4);
- client.killQuery(queryId);
+ client.killQuery(queryId3);
+ client.killQuery(queryId2);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
index d320077..77aa1d4 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
@@ -20,19 +20,20 @@ package org.apache.tajo.worker;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.service.Service;
-import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.TpchTestBase;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.benchmark.TPCH;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.client.TajoClientImpl;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.querymaster.QueryInfo;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
+import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
@@ -40,29 +41,34 @@ import java.util.Map;
import static org.junit.Assert.*;
public class TestHistory {
- private TajoTestingCluster cluster;
- private TajoMaster master;
- private TajoConf conf;
- private TajoClient client;
-
- @Before
- public void setUp() throws Exception {
- cluster = TpchTestBase.getInstance().getTestingCluster();
+ private static TajoTestingCluster cluster;
+ private static TajoMaster master;
+ private static TajoConf conf;
+ private static TajoClient client;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ cluster = new TajoTestingCluster();
+ cluster.startMiniClusterInLocal(1);
master = cluster.getMaster();
conf = cluster.getConfiguration();
- client = new TajoClientImpl(conf);
+ client = new TajoClientImpl(cluster.getConfiguration());
+ File file = TPCH.getDataFile("lineitem");
+ client.executeQueryAndGetResult("create external table default.lineitem (l_orderkey int, l_partkey int) "
+ + "using text location 'file://" + file.getAbsolutePath() + "'");
+ assertTrue(client.existTable("default.lineitem"));
}
- @After
- public void tearDown() {
- client.close();
+ @AfterClass
+ public static void tearDown() throws IOException {
+ if (client != null) client.close();
+ if (cluster != null) cluster.shutdownMiniCluster();
}
-
@Test
public final void testTaskRunnerHistory() throws IOException, ServiceException, InterruptedException {
int beforeFinishedQueriesCount = master.getContext().getQueryJobManager().getFinishedQueries().size();
- client.executeQueryAndGetResult("select sleep(1) from lineitem");
+ client.executeQueryAndGetResult("select count(*) from lineitem");
Collection<QueryInfo> finishedQueries = master.getContext().getQueryJobManager().getFinishedQueries();
assertTrue(finishedQueries.size() > beforeFinishedQueriesCount);
@@ -89,7 +95,7 @@ public class TestHistory {
@Test
public final void testTaskHistory() throws IOException, ServiceException, InterruptedException {
int beforeFinishedQueriesCount = master.getContext().getQueryJobManager().getFinishedQueries().size();
- client.executeQueryAndGetResult("select sleep(1) from lineitem");
+ client.executeQueryAndGetResult("select count(*) from lineitem");
Collection<QueryInfo> finishedQueries = master.getContext().getQueryJobManager().getFinishedQueries();
assertTrue(finishedQueries.size() > beforeFinishedQueriesCount);