You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/05/16 07:44:49 UTC
git commit: TAJO-811: add simple fifo scheduler support. (jinho)
Repository: tajo
Updated Branches:
refs/heads/master 9350a8026 -> 4a747a0f8
TAJO-811: add simple fifo scheduler support. (jinho)
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/4a747a0f
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/4a747a0f
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/4a747a0f
Branch: refs/heads/master
Commit: 4a747a0f8756f046173ab2eaa15dc2b03bd78379
Parents: 9350a80
Author: jinossy <ji...@gmail.com>
Authored: Fri May 16 14:43:46 2014 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Fri May 16 14:43:46 2014 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../java/org/apache/tajo/client/TajoAdmin.java | 58 +++-----
.../tajo/master/TajoMasterClientService.java | 5 +-
.../master/querymaster/QueryInProgress.java | 14 +-
.../master/querymaster/QueryJobManager.java | 82 +++++++++--
.../tajo/scheduler/QuerySchedulingInfo.java | 55 +++++++
.../org/apache/tajo/scheduler/Scheduler.java | 41 ++++++
.../tajo/scheduler/SchedulingAlgorithms.java | 47 ++++++
.../tajo/scheduler/SimpleFifoScheduler.java | 147 +++++++++++++++++++
.../src/main/resources/webapps/admin/query.jsp | 10 +-
.../tajo/scheduler/TestFifoScheduler.java | 110 ++++++++++++++
11 files changed, 509 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 222f02e..41ac271 100644
--- a/CHANGES
+++ b/CHANGES
@@ -15,6 +15,8 @@ Release 0.9.0 - unreleased
IMPROVEMENT
+ TAJO-811: add simple fifo scheduler support. (jinho)
+
TAJO-801: Multiple distinct should be supported. (Hyoungjun Kim via hyunsik)
TAJO-807: Implement Round(numeric, int) function.
http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
index 9a0478c..25b91a4 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
@@ -21,7 +21,7 @@ package org.apache.tajo.client;
import com.google.protobuf.ServiceException;
import org.apache.commons.cli.*;
import org.apache.commons.lang.StringUtils;
-import org.apache.tajo.TajoProtos.QueryState;
+import org.apache.tajo.QueryId;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo;
import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo;
@@ -46,13 +46,9 @@ public class TajoAdmin {
}
final static String line5 = "-----";
- final static String line7 = "-------";
final static String line10 = "----------";
final static String line12 = "------------";
- final static String line15 = "---------------";
- final static String line20 = "--------------------";
final static String line25 = "-------------------------";
- final static String line30 = "------------------------------";
final static String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
static {
@@ -84,18 +80,6 @@ public class TajoAdmin {
formatter.printHelp( "admin [options]", options );
}
- private String getQueryState(QueryState state) {
- String stateStr = "FAILED";
-
- if (TajoClient.isQueryRunnning(state)) {
- stateStr = "RUNNING";
- } else if (state == QueryState.QUERY_SUCCEEDED) {
- stateStr = "SUCCEED";
- }
-
- return stateStr;
- }
-
public void runCommand(String[] args) throws Exception {
CommandLineParser parser = new PosixParser();
CommandLine cmd = parser.parse(options, args);
@@ -195,13 +179,13 @@ public class TajoAdmin {
writer.write("\n");
writer.write("Started Time: " + df.format(queryInfo.getStartTime()));
writer.write("\n");
- String state = getQueryState(queryInfo.getState());
- writer.write("Query State: " + state);
+
+ writer.write("Query State: " + queryInfo.getState().name());
writer.write("\n");
long end = queryInfo.getFinishTime();
long start = queryInfo.getStartTime();
String executionTime = decimalF.format((end-start) / 1000) + " sec";
- if (state.equals("RUNNING") == false) {
+ if (!TajoClient.isQueryRunnning(queryInfo.getState())) {
writer.write("Finished Time: " + df.format(queryInfo.getFinishTime()));
writer.write("\n");
}
@@ -385,24 +369,28 @@ public class TajoAdmin {
ServiceException, SQLException {
List<BriefQueryInfo> queryList = tajoClient.getRunningQueryList();
SimpleDateFormat df = new SimpleDateFormat(DATE_FORMAT);
- String fmt = "%1$-20s %2$-7s %3$-20s %4$-30s%n";
- String line = String.format(fmt, "QueryId", "State",
- "StartTime", "Query");
- writer.write(line);
- line = String.format(fmt, line20, line7, line20, line30);
- writer.write(line);
+ StringBuilder builder = new StringBuilder();
- for (BriefQueryInfo queryInfo : queryList) {
- String queryId = String.format("q_%s_%04d",
- queryInfo.getQueryId().getId(),
- queryInfo.getQueryId().getSeq());
- String state = getQueryState(queryInfo.getState());
- String startTime = df.format(queryInfo.getStartTime());
+ /* print title */
+ builder.append(StringUtils.rightPad("QueryId", 21));
+ builder.append(StringUtils.rightPad("State", 20));
+ builder.append(StringUtils.rightPad("StartTime", 20));
+ builder.append(StringUtils.rightPad("Query", 30)).append("\n");
- String sql = StringUtils.abbreviate(queryInfo.getQuery(), 30);
- line = String.format(fmt, queryId, state, startTime, sql);
- writer.write(line);
+ builder.append(StringUtils.rightPad(StringUtils.repeat("-", 20), 21));
+ builder.append(StringUtils.rightPad(StringUtils.repeat("-", 19), 20));
+ builder.append(StringUtils.rightPad(StringUtils.repeat("-", 19), 20));
+ builder.append(StringUtils.rightPad(StringUtils.repeat("-", 29), 30)).append("\n");
+ writer.write(builder.toString());
+
+ builder = new StringBuilder();
+ for (BriefQueryInfo queryInfo : queryList) {
+ builder.append(StringUtils.rightPad(new QueryId(queryInfo.getQueryId()).toString(), 21));
+ builder.append(StringUtils.rightPad(queryInfo.getState().name(), 20));
+ builder.append(StringUtils.rightPad(df.format(queryInfo.getStartTime()), 20));
+ builder.append(StringUtils.abbreviate(queryInfo.getQuery(), 30)).append("\n");
}
+ writer.write(builder.toString());
}
public void processKill(Writer writer, String queryIdStr)
http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index f6ad085..97f59ef 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -354,9 +354,8 @@ public class TajoMasterClientService extends AbstractService {
context.getSessionManager().touch(request.getSessionId().getId());
GetQueryListResponse.Builder builder= GetQueryListResponse.newBuilder();
- Collection<QueryInProgress> queries
- = context.getQueryJobManager().getRunningQueries();
-
+ Collection<QueryInProgress> queries = new ArrayList<QueryInProgress>(context.getQueryJobManager().getSubmittedQueries());
+ queries.addAll(context.getQueryJobManager().getRunningQueries());
BriefQueryInfo.Builder infoBuilder = BriefQueryInfo.newBuilder();
for (QueryInProgress queryInProgress : queries) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index dac2d4c..e561a4c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -101,8 +101,10 @@ public class QueryInProgress extends CompositeService {
super.init(conf);
}
- public void kill() {
- queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
+ public synchronized void kill() {
+ if(queryMasterRpcClient != null){
+ queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
+ }
}
@Override
@@ -202,10 +204,6 @@ public class QueryInProgress extends CompositeService {
}
}
- public QueryMasterProtocolService getQueryMasterRpcClient() {
- return queryMasterRpcClient;
- }
-
private void connectQueryMaster() throws Exception {
InetSocketAddress addr = NetUtils.createSocketAddrForHost(
queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
@@ -260,6 +258,10 @@ public class QueryInProgress extends CompositeService {
return this.queryInfo;
}
+ public boolean isStarted() {
+ return this.querySubmitted.get();
+ }
+
private void heartbeat(QueryInfo queryInfo) {
LOG.info("Received QueryMaster heartbeat:" + queryInfo);
this.queryInfo.setQueryState(queryInfo.getQueryState());
http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index bc5fcad..66db9d6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -26,11 +26,13 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TajoProtos;
import org.apache.tajo.engine.planner.logical.LogicalRootNode;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.session.Session;
+import org.apache.tajo.scheduler.SimpleFifoScheduler;
import java.util.Collection;
import java.util.Collections;
@@ -45,6 +47,10 @@ public class QueryJobManager extends CompositeService {
private AsyncDispatcher dispatcher;
+ private SimpleFifoScheduler scheduler;
+
+ private final Map<QueryId, QueryInProgress> submittedQueries = new HashMap<QueryId, QueryInProgress>();
+
private final Map<QueryId, QueryInProgress> runningQueries = new HashMap<QueryId, QueryInProgress>();
private final Map<QueryId, QueryInProgress> finishedQueries = new HashMap<QueryId, QueryInProgress>();
@@ -61,6 +67,8 @@ public class QueryJobManager extends CompositeService {
addService(this.dispatcher);
this.dispatcher.register(QueryJobEvent.Type.class, new QueryJobManagerEventHandler());
+
+ this.scheduler = new SimpleFifoScheduler(this);
} catch (Exception e) {
catchException(null, e);
}
@@ -75,11 +83,13 @@ public class QueryJobManager extends CompositeService {
eachQueryInProgress.stop();
}
}
+ this.scheduler.stop();
super.stop();
}
@Override
public void start() {
+ this.scheduler.start();
super.start();
}
@@ -87,6 +97,10 @@ public class QueryJobManager extends CompositeService {
return dispatcher.getEventHandler();
}
+ public Collection<QueryInProgress> getSubmittedQueries() {
+ return Collections.unmodifiableCollection(submittedQueries.values());
+ }
+
public Collection<QueryInProgress> getRunningQueries() {
return Collections.unmodifiableCollection(runningQueries.values());
}
@@ -102,40 +116,75 @@ public class QueryJobManager extends CompositeService {
QueryInProgress queryInProgress = new QueryInProgress(masterContext, session, queryContext, queryId, sql,
jsonExpr, plan);
- synchronized(runningQueries) {
- runningQueries.put(queryId, queryInProgress);
+ synchronized (submittedQueries) {
+ queryInProgress.getQueryInfo().setQueryMaster("");
+ submittedQueries.put(queryInProgress.getQueryId(), queryInProgress);
+ }
+
+ scheduler.addQuery(queryInProgress);
+ return queryInProgress.getQueryInfo();
+ }
+
+ public QueryInfo startQueryJob(QueryId queryId) throws Exception {
+
+ QueryInProgress queryInProgress;
+
+ synchronized (submittedQueries) {
+ queryInProgress = submittedQueries.remove(queryId);
+ }
+
+ synchronized (runningQueries) {
+ runningQueries.put(queryInProgress.getQueryId(), queryInProgress);
}
addService(queryInProgress);
queryInProgress.init(getConfig());
queryInProgress.start();
- if(!queryInProgress.startQueryMaster()) {
- return null;
+ if (!queryInProgress.startQueryMaster()) {
+ stopQuery(queryId);
}
return queryInProgress.getQueryInfo();
}
+ public TajoMaster.MasterContext getMasterContext() {
+ return masterContext;
+ }
+
class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> {
@Override
public void handle(QueryJobEvent event) {
- QueryInProgress queryInProgress = null;
- synchronized(runningQueries) {
- queryInProgress = runningQueries.get(event.getQueryInfo().getQueryId());
- if(queryInProgress == null) {
- LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]");
- return;
+ QueryInProgress queryInProgress = getQueryInProgress(event.getQueryInfo().getQueryId());
+ if(queryInProgress == null) {
+ LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]");
+ return;
+ }
+ if(queryInProgress.isStarted()){
+ queryInProgress.getEventHandler().handle(event);
+ } else {
+ if(event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL){
+ scheduler.removeQuery(queryInProgress.getQueryId());
+ queryInProgress.getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
+
+ stopQuery(queryInProgress.getQueryId());
}
}
- queryInProgress.getEventHandler().handle(event);
}
}
public QueryInProgress getQueryInProgress(QueryId queryId) {
- synchronized(runningQueries) {
- return runningQueries.get(queryId);
+ QueryInProgress queryInProgress;
+ synchronized (submittedQueries) {
+ queryInProgress = submittedQueries.get(queryId);
}
+
+ if (queryInProgress == null) {
+ synchronized (runningQueries) {
+ queryInProgress = runningQueries.get(queryId);
+ }
+ }
+ return queryInProgress;
}
public QueryInProgress getFinishedQuery(QueryId queryId) {
@@ -149,8 +198,15 @@ public class QueryJobManager extends CompositeService {
QueryInProgress queryInProgress = getQueryInProgress(queryId);
if(queryInProgress != null) {
queryInProgress.stop();
+ synchronized(submittedQueries) {
+ submittedQueries.remove(queryId);
+ }
+
synchronized(runningQueries) {
runningQueries.remove(queryId);
+ }
+
+ synchronized(finishedQueries) {
finishedQueries.put(queryId, queryInProgress);
}
} else {
http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-core/src/main/java/org/apache/tajo/scheduler/QuerySchedulingInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/scheduler/QuerySchedulingInfo.java b/tajo-core/src/main/java/org/apache/tajo/scheduler/QuerySchedulingInfo.java
new file mode 100644
index 0000000..d9932bd
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/scheduler/QuerySchedulingInfo.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.scheduler;
+
+import com.google.common.base.Objects;
+import org.apache.tajo.QueryId;
+
+public class QuerySchedulingInfo {
+ private QueryId queryId;
+ private Integer priority;
+ private Long startTime;
+
+ public QuerySchedulingInfo(QueryId queryId, Integer priority, Long startTime) {
+ this.queryId = queryId;
+ this.priority = priority;
+ this.startTime = startTime;
+ }
+
+ public QueryId getQueryId() {
+ return queryId;
+ }
+
+ public Integer getPriority() {
+ return priority;
+ }
+
+ public Long getStartTime() {
+ return startTime;
+ }
+
+ public String getName() {
+ return queryId.getId();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(startTime, getName(), priority);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-core/src/main/java/org/apache/tajo/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/scheduler/Scheduler.java b/tajo-core/src/main/java/org/apache/tajo/scheduler/Scheduler.java
new file mode 100644
index 0000000..d74280c
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/scheduler/Scheduler.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.scheduler;
+
+import org.apache.tajo.QueryId;
+import org.apache.tajo.master.querymaster.QueryInProgress;
+
+import java.util.List;
+
+public interface Scheduler {
+
+ public Mode getMode();
+
+ public String getName();
+
+ public boolean addQuery(QueryInProgress resource);
+
+ public boolean removeQuery(QueryId queryId);
+
+ public List<QueryInProgress> getRunningQueries();
+
+ public enum Mode {
+ FIFO
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-core/src/main/java/org/apache/tajo/scheduler/SchedulingAlgorithms.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/scheduler/SchedulingAlgorithms.java b/tajo-core/src/main/java/org/apache/tajo/scheduler/SchedulingAlgorithms.java
new file mode 100644
index 0000000..9c9b16d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/scheduler/SchedulingAlgorithms.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.scheduler;
+
+import java.util.Comparator;
+
+/**
+ * Utility class containing scheduling algorithms used in the scheduler.
+ */
+
+public class SchedulingAlgorithms {
+ /**
+ * Compare Schedulables in order of priority and then submission time, as in
+ * the default FIFO scheduler in Tajo.
+ */
+ public static class FifoComparator implements Comparator<QuerySchedulingInfo> {
+ @Override
+ public int compare(QuerySchedulingInfo q1, QuerySchedulingInfo q2) {
+ int res = q1.getPriority().compareTo(q2.getPriority());
+ if (res == 0) {
+ res = (int) Math.signum(q1.getStartTime() - q2.getStartTime());
+ }
+ if (res == 0) {
+ // In the rare case where jobs were submitted at the exact same time,
+ // compare them by name (which will be the QueryId) to get a deterministic ordering
+ res = q1.getName().compareTo(q2.getName());
+ }
+ return res;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java b/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java
new file mode 100644
index 0000000..87968a5
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.scheduler;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.master.querymaster.QueryInProgress;
+import org.apache.tajo.master.querymaster.QueryJobManager;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SimpleFifoScheduler implements Scheduler {
+ private static final Log LOG = LogFactory.getLog(SimpleFifoScheduler.class.getName());
+ private LinkedList<QuerySchedulingInfo> pool = new LinkedList<QuerySchedulingInfo>();
+ private final Thread queryProcessor;
+ private static AtomicBoolean stopped = new AtomicBoolean();
+ private QueryJobManager manager;
+ private Comparator<QuerySchedulingInfo> COMPARATOR = new SchedulingAlgorithms.FifoComparator();
+
+ public SimpleFifoScheduler(QueryJobManager manager) {
+ this.manager = manager;
+ this.queryProcessor = new Thread(new QueryProcessor());
+ this.queryProcessor.setName("Query Processor");
+ }
+
+ @Override
+ public Mode getMode() {
+ return Mode.FIFO;
+ }
+
+ @Override
+ public String getName() {
+ return manager.getName();
+ }
+
+ @Override
+ public boolean addQuery(QueryInProgress queryInProgress) {
+ int qSize = pool.size();
+ if (qSize != 0 && qSize % 100 == 0) {
+ LOG.info("Size of Fifo queue is " + qSize);
+ }
+
+ QuerySchedulingInfo querySchedulingInfo = new QuerySchedulingInfo(queryInProgress.getQueryId(), 1, queryInProgress.getStartTime());
+ boolean result = pool.add(querySchedulingInfo);
+ if (getRunningQueries().size() == 0) wakeupProcessor();
+ return result;
+ }
+
+ @Override
+ public boolean removeQuery(QueryId queryId) {
+ return pool.remove(getQueryByQueryId(queryId));
+ }
+
+ public QuerySchedulingInfo getQueryByQueryId(QueryId queryId) {
+ for (QuerySchedulingInfo querySchedulingInfo : pool) {
+ if (querySchedulingInfo.getQueryId().equals(queryId)) {
+ return querySchedulingInfo;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public List<QueryInProgress> getRunningQueries() {
+ return new ArrayList<QueryInProgress>(manager.getRunningQueries());
+ }
+
+ public void start() {
+ queryProcessor.start();
+ }
+
+ public void stop() {
+ if (stopped.getAndSet(true)) {
+ return;
+ }
+ pool.clear();
+ synchronized (queryProcessor) {
+ queryProcessor.interrupt();
+ }
+ }
+
+ private QuerySchedulingInfo pollScheduledQuery() {
+ if (pool.size() > 1) {
+ Collections.sort(pool, COMPARATOR);
+ }
+ return pool.poll();
+ }
+
+ private void wakeupProcessor() {
+ synchronized (queryProcessor) {
+ queryProcessor.notifyAll();
+ }
+ }
+
+ private final class QueryProcessor implements Runnable {
+ @Override
+ public void run() {
+
+ QuerySchedulingInfo query;
+
+ while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+ query = null;
+ if (getRunningQueries().size() == 0) {
+ query = pollScheduledQuery();
+ }
+
+ if (query != null) {
+ try {
+ manager.startQueryJob(query.getQueryId());
+ } catch (Throwable t) {
+ LOG.fatal("Exception during query startup:", t);
+ manager.stopQuery(query.getQueryId());
+ }
+ }
+
+ synchronized (queryProcessor) {
+ try {
+ queryProcessor.wait(500);
+ } catch (InterruptedException e) {
+ if (stopped.get()) {
+ break;
+ }
+ LOG.warn("Exception during shutdown: ", e);
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-core/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp
index 6f15a0e..4e8d7b0 100644
--- a/tajo-core/src/main/resources/webapps/admin/query.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/query.jsp
@@ -26,16 +26,16 @@
<%@ page import="org.apache.tajo.util.StringUtils" %>
<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
<%@ page import="java.text.SimpleDateFormat" %>
-<%@ page import="java.util.Collection" %>
-<%@ page import="java.util.HashMap" %>
-<%@ page import="java.util.List" %>
-<%@ page import="java.util.Map" %>
+<%@ page import="java.util.*" %>
<%
TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
List<QueryInProgress> runningQueries =
- JSPUtil.sortQueryInProgress(master.getContext().getQueryJobManager().getRunningQueries(), true);
+ new ArrayList<QueryInProgress>(master.getContext().getQueryJobManager().getSubmittedQueries());
+
+ runningQueries.addAll(master.getContext().getQueryJobManager().getRunningQueries());
+ JSPUtil.sortQueryInProgress(runningQueries, true);
List<QueryInProgress> finishedQueries =
JSPUtil.sortQueryInProgress(master.getContext().getQueryJobManager().getFinishedQueries(), true);
http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java b/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java
new file mode 100644
index 0000000..76f22d0
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.scheduler;
+
+import com.google.protobuf.ServiceException;
+import org.apache.tajo.*;
+import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ClientProtos;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+
+import static org.junit.Assert.*;
+
+@Category(IntegrationTest.class)
+public class TestFifoScheduler {
+ private static TajoTestingCluster cluster;
+ private static TajoConf conf;
+ private static TajoClient client;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ cluster = TpchTestBase.getInstance().getTestingCluster();
+ conf = cluster.getConfiguration();
+ client = new TajoClient(conf);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ client.close();
+ }
+
+ @Test
+ public final void testKillScheduledQuery() throws IOException, ServiceException, InterruptedException {
+ ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(2) from lineitem");
+ ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select sleep(2) from lineitem");
+ Thread.sleep(1000);
+ QueryId queryId = new QueryId(res.getQueryId());
+ QueryId queryId2 = new QueryId(res2.getQueryId());
+ assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId2).getState());
+
+ client.killQuery(queryId2);
+ assertEquals(TajoProtos.QueryState.QUERY_KILLED, client.getQueryStatus(queryId2).getState());
+ client.killQuery(queryId);
+ assertEquals(TajoProtos.QueryState.QUERY_KILLED, client.getQueryStatus(queryId).getState());
+ }
+
+ @Test
+ public final void testForwardedQuery() throws IOException, ServiceException, InterruptedException {
+ ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(2) from lineitem");
+ ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select * from lineitem limit 1");
+
+ Thread.sleep(1000);
+ assertFalse(res2.getIsForwarded());
+ QueryId queryId2 = new QueryId(res2.getQueryId());
+ assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, client.getQueryStatus(queryId2).getState());
+ ResultSet resSet = TajoClient.createResultSet(client, res2);
+ assertNotNull(resSet);
+
+ QueryId queryId = new QueryId(res.getQueryId());
+ assertEquals(TajoProtos.QueryState.QUERY_RUNNING, client.getQueryStatus(queryId).getState());
+ client.killQuery(queryId);
+ }
+
+ @Test
+ public final void testScheduledQuery() throws IOException, ServiceException, InterruptedException {
+ ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(2) from lineitem");
+ ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select sleep(2) from lineitem");
+ ClientProtos.SubmitQueryResponse res3 = client.executeQuery("select sleep(2) from lineitem");
+ ClientProtos.SubmitQueryResponse res4 = client.executeQuery("select sleep(2) from lineitem");
+
+ Thread.sleep(1000);
+
+ QueryId queryId = new QueryId(res.getQueryId());
+ QueryId queryId2 = new QueryId(res2.getQueryId());
+ QueryId queryId3 = new QueryId(res3.getQueryId());
+ QueryId queryId4 = new QueryId(res4.getQueryId());
+ assertEquals(TajoProtos.QueryState.QUERY_RUNNING, client.getQueryStatus(queryId).getState());
+
+ assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId2).getState());
+ assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId3).getState());
+ assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId4).getState());
+
+ client.killQuery(queryId2);
+ client.killQuery(queryId3);
+ client.killQuery(queryId4);
+ client.killQuery(queryId);
+ }
+}