You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/05/16 07:44:49 UTC

git commit: TAJO-811: add simple fifo scheduler support. (jinho)

Repository: tajo
Updated Branches:
  refs/heads/master 9350a8026 -> 4a747a0f8


TAJO-811: add simple fifo scheduler support. (jinho)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/4a747a0f
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/4a747a0f
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/4a747a0f

Branch: refs/heads/master
Commit: 4a747a0f8756f046173ab2eaa15dc2b03bd78379
Parents: 9350a80
Author: jinossy <ji...@gmail.com>
Authored: Fri May 16 14:43:46 2014 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Fri May 16 14:43:46 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../java/org/apache/tajo/client/TajoAdmin.java  |  58 +++-----
 .../tajo/master/TajoMasterClientService.java    |   5 +-
 .../master/querymaster/QueryInProgress.java     |  14 +-
 .../master/querymaster/QueryJobManager.java     |  82 +++++++++--
 .../tajo/scheduler/QuerySchedulingInfo.java     |  55 +++++++
 .../org/apache/tajo/scheduler/Scheduler.java    |  41 ++++++
 .../tajo/scheduler/SchedulingAlgorithms.java    |  47 ++++++
 .../tajo/scheduler/SimpleFifoScheduler.java     | 147 +++++++++++++++++++
 .../src/main/resources/webapps/admin/query.jsp  |  10 +-
 .../tajo/scheduler/TestFifoScheduler.java       | 110 ++++++++++++++
 11 files changed, 509 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 222f02e..41ac271 100644
--- a/CHANGES
+++ b/CHANGES
@@ -15,6 +15,8 @@ Release 0.9.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-811: add simple fifo scheduler support. (jinho)
+
     TAJO-801: Multiple distinct should be supported. (Hyoungjun Kim via hyunsik)
 
     TAJO-807: Implement Round(numeric, int) function.

http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
index 9a0478c..25b91a4 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
@@ -21,7 +21,7 @@ package org.apache.tajo.client;
 import com.google.protobuf.ServiceException;
 import org.apache.commons.cli.*;
 import org.apache.commons.lang.StringUtils;
-import org.apache.tajo.TajoProtos.QueryState;
+import org.apache.tajo.QueryId;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo;
 import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo;
@@ -46,13 +46,9 @@ public class TajoAdmin {
   }
 
   final static String line5   = "-----";
-  final static String line7   = "-------";
   final static String line10  = "----------";
   final static String line12  = "------------";
-  final static String line15  = "---------------";
-  final static String line20  = "--------------------";
   final static String line25  = "-------------------------";
-  final static String line30  = "------------------------------";
   final static String DATE_FORMAT  = "yyyy-MM-dd HH:mm:ss";
 
   static {
@@ -84,18 +80,6 @@ public class TajoAdmin {
     formatter.printHelp( "admin [options]", options );
   }
 
-  private String getQueryState(QueryState state) {
-    String stateStr = "FAILED";
-
-    if (TajoClient.isQueryRunnning(state)) {
-      stateStr = "RUNNING";
-    } else if (state == QueryState.QUERY_SUCCEEDED) {
-      stateStr = "SUCCEED";
-    }
-
-    return stateStr;
-  }
-
   public void runCommand(String[] args) throws Exception {
     CommandLineParser parser = new PosixParser();
     CommandLine cmd = parser.parse(options, args);
@@ -195,13 +179,13 @@ public class TajoAdmin {
         writer.write("\n");
         writer.write("Started Time: " + df.format(queryInfo.getStartTime()));
         writer.write("\n");
-        String state = getQueryState(queryInfo.getState());
-        writer.write("Query State: " + state);
+
+        writer.write("Query State: " + queryInfo.getState().name());
         writer.write("\n");
         long end = queryInfo.getFinishTime();
         long start = queryInfo.getStartTime();
         String executionTime = decimalF.format((end-start) / 1000) + " sec";
-        if (state.equals("RUNNING") == false) {
+        if (!TajoClient.isQueryRunnning(queryInfo.getState())) {
           writer.write("Finished Time: " + df.format(queryInfo.getFinishTime()));
           writer.write("\n");
         }
@@ -385,24 +369,28 @@ public class TajoAdmin {
       ServiceException, SQLException {
     List<BriefQueryInfo> queryList = tajoClient.getRunningQueryList();
     SimpleDateFormat df = new SimpleDateFormat(DATE_FORMAT);
-    String fmt = "%1$-20s %2$-7s %3$-20s %4$-30s%n";
-    String line = String.format(fmt, "QueryId", "State", 
-                  "StartTime", "Query");
-    writer.write(line);
-    line = String.format(fmt, line20, line7, line20, line30);
-    writer.write(line);
+    StringBuilder builder = new StringBuilder();
 
-    for (BriefQueryInfo queryInfo : queryList) {
-        String queryId = String.format("q_%s_%04d",
-                                       queryInfo.getQueryId().getId(),
-                                       queryInfo.getQueryId().getSeq());
-        String state = getQueryState(queryInfo.getState());
-        String startTime = df.format(queryInfo.getStartTime());
+    /* print title */
+    builder.append(StringUtils.rightPad("QueryId", 21));
+    builder.append(StringUtils.rightPad("State", 20));
+    builder.append(StringUtils.rightPad("StartTime", 20));
+    builder.append(StringUtils.rightPad("Query", 30)).append("\n");
 
-        String sql = StringUtils.abbreviate(queryInfo.getQuery(), 30);
-        line = String.format(fmt, queryId, state, startTime, sql);
-        writer.write(line);
+    builder.append(StringUtils.rightPad(StringUtils.repeat("-", 20), 21));
+    builder.append(StringUtils.rightPad(StringUtils.repeat("-", 19), 20));
+    builder.append(StringUtils.rightPad(StringUtils.repeat("-", 19), 20));
+    builder.append(StringUtils.rightPad(StringUtils.repeat("-", 29), 30)).append("\n");
+    writer.write(builder.toString());
+
+    builder = new StringBuilder();
+    for (BriefQueryInfo queryInfo : queryList) {
+      builder.append(StringUtils.rightPad(new QueryId(queryInfo.getQueryId()).toString(), 21));
+      builder.append(StringUtils.rightPad(queryInfo.getState().name(), 20));
+      builder.append(StringUtils.rightPad(df.format(queryInfo.getStartTime()), 20));
+      builder.append(StringUtils.abbreviate(queryInfo.getQuery(), 30)).append("\n");
     }
+    writer.write(builder.toString());
   }
 
   public void processKill(Writer writer, String queryIdStr)

http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index f6ad085..97f59ef 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -354,9 +354,8 @@ public class TajoMasterClientService extends AbstractService {
         context.getSessionManager().touch(request.getSessionId().getId());
         GetQueryListResponse.Builder builder= GetQueryListResponse.newBuilder();
 
-        Collection<QueryInProgress> queries
-          = context.getQueryJobManager().getRunningQueries();
-
+        Collection<QueryInProgress> queries = new ArrayList<QueryInProgress>(context.getQueryJobManager().getSubmittedQueries());
+        queries.addAll(context.getQueryJobManager().getRunningQueries());
         BriefQueryInfo.Builder infoBuilder = BriefQueryInfo.newBuilder();
 
         for (QueryInProgress queryInProgress : queries) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index dac2d4c..e561a4c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -101,8 +101,10 @@ public class QueryInProgress extends CompositeService {
     super.init(conf);
   }
 
-  public void kill() {
-    queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
+  public synchronized void kill() {
+    if(queryMasterRpcClient != null){
+      queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
+    }
   }
 
   @Override
@@ -202,10 +204,6 @@ public class QueryInProgress extends CompositeService {
     }
   }
 
-  public QueryMasterProtocolService getQueryMasterRpcClient() {
-    return queryMasterRpcClient;
-  }
-
   private void connectQueryMaster() throws Exception {
     InetSocketAddress addr = NetUtils.createSocketAddrForHost(
         queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
@@ -260,6 +258,10 @@ public class QueryInProgress extends CompositeService {
     return this.queryInfo;
   }
 
+  public boolean isStarted() {
+    return this.querySubmitted.get();
+  }
+
   private void heartbeat(QueryInfo queryInfo) {
     LOG.info("Received QueryMaster heartbeat:" + queryInfo);
     this.queryInfo.setQueryState(queryInfo.getQueryState());

http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index bc5fcad..66db9d6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -26,11 +26,13 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TajoProtos;
 import org.apache.tajo.engine.planner.logical.LogicalRootNode;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.session.Session;
+import org.apache.tajo.scheduler.SimpleFifoScheduler;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -45,6 +47,10 @@ public class QueryJobManager extends CompositeService {
 
   private AsyncDispatcher dispatcher;
 
+  private SimpleFifoScheduler scheduler;
+
+  private final Map<QueryId, QueryInProgress> submittedQueries = new HashMap<QueryId, QueryInProgress>();
+
   private final Map<QueryId, QueryInProgress> runningQueries = new HashMap<QueryId, QueryInProgress>();
 
   private final Map<QueryId, QueryInProgress> finishedQueries = new HashMap<QueryId, QueryInProgress>();
@@ -61,6 +67,8 @@ public class QueryJobManager extends CompositeService {
       addService(this.dispatcher);
 
       this.dispatcher.register(QueryJobEvent.Type.class, new QueryJobManagerEventHandler());
+
+      this.scheduler = new SimpleFifoScheduler(this);
     } catch (Exception e) {
       catchException(null, e);
     }
@@ -75,11 +83,13 @@ public class QueryJobManager extends CompositeService {
         eachQueryInProgress.stop();
       }
     }
+    this.scheduler.stop();
     super.stop();
   }
 
   @Override
   public void start() {
+    this.scheduler.start();
     super.start();
   }
 
@@ -87,6 +97,10 @@ public class QueryJobManager extends CompositeService {
     return dispatcher.getEventHandler();
   }
 
+  public Collection<QueryInProgress> getSubmittedQueries() {
+    return Collections.unmodifiableCollection(submittedQueries.values());
+  }
+
   public Collection<QueryInProgress> getRunningQueries() {
     return Collections.unmodifiableCollection(runningQueries.values());
   }
@@ -102,40 +116,75 @@ public class QueryJobManager extends CompositeService {
     QueryInProgress queryInProgress = new QueryInProgress(masterContext, session, queryContext, queryId, sql,
         jsonExpr, plan);
 
-    synchronized(runningQueries) {
-      runningQueries.put(queryId, queryInProgress);
+    synchronized (submittedQueries) {
+      queryInProgress.getQueryInfo().setQueryMaster("");
+      submittedQueries.put(queryInProgress.getQueryId(), queryInProgress);
+    }
+
+    scheduler.addQuery(queryInProgress);
+    return queryInProgress.getQueryInfo();
+  }
+
+  public QueryInfo startQueryJob(QueryId queryId) throws Exception {
+
+    QueryInProgress queryInProgress;
+
+    synchronized (submittedQueries) {
+      queryInProgress = submittedQueries.remove(queryId);
+    }
+
+    synchronized (runningQueries) {
+      runningQueries.put(queryInProgress.getQueryId(), queryInProgress);
     }
 
     addService(queryInProgress);
     queryInProgress.init(getConfig());
     queryInProgress.start();
 
-    if(!queryInProgress.startQueryMaster()) {
-      return null;
+    if (!queryInProgress.startQueryMaster()) {
+      stopQuery(queryId);
     }
 
     return queryInProgress.getQueryInfo();
   }
 
+  public TajoMaster.MasterContext getMasterContext() {
+    return masterContext;
+  }
+
   class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> {
     @Override
     public void handle(QueryJobEvent event) {
-      QueryInProgress queryInProgress = null;
-      synchronized(runningQueries) {
-        queryInProgress = runningQueries.get(event.getQueryInfo().getQueryId());
-        if(queryInProgress == null) {
-          LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]");
-          return;
+      QueryInProgress queryInProgress = getQueryInProgress(event.getQueryInfo().getQueryId());
+      if(queryInProgress == null) {
+        LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]");
+        return;
+      }
+      if(queryInProgress.isStarted()){
+        queryInProgress.getEventHandler().handle(event);
+      } else {
+        if(event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL){
+          scheduler.removeQuery(queryInProgress.getQueryId());
+          queryInProgress.getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
+
+          stopQuery(queryInProgress.getQueryId());
         }
       }
-      queryInProgress.getEventHandler().handle(event);
     }
   }
 
   public QueryInProgress getQueryInProgress(QueryId queryId) {
-    synchronized(runningQueries) {
-      return runningQueries.get(queryId);
+    QueryInProgress queryInProgress;
+    synchronized (submittedQueries) {
+      queryInProgress = submittedQueries.get(queryId);
     }
+
+    if (queryInProgress == null) {
+      synchronized (runningQueries) {
+        queryInProgress = runningQueries.get(queryId);
+      }
+    }
+    return queryInProgress;
   }
 
   public QueryInProgress getFinishedQuery(QueryId queryId) {
@@ -149,8 +198,15 @@ public class QueryJobManager extends CompositeService {
     QueryInProgress queryInProgress = getQueryInProgress(queryId);
     if(queryInProgress != null) {
       queryInProgress.stop();
+      synchronized(submittedQueries) {
+        submittedQueries.remove(queryId);
+      }
+
       synchronized(runningQueries) {
         runningQueries.remove(queryId);
+      }
+
+      synchronized(finishedQueries) {
         finishedQueries.put(queryId, queryInProgress);
       }
     } else {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-core/src/main/java/org/apache/tajo/scheduler/QuerySchedulingInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/scheduler/QuerySchedulingInfo.java b/tajo-core/src/main/java/org/apache/tajo/scheduler/QuerySchedulingInfo.java
new file mode 100644
index 0000000..d9932bd
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/scheduler/QuerySchedulingInfo.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.scheduler;
+
+import com.google.common.base.Objects;
+import org.apache.tajo.QueryId;
+
+public class QuerySchedulingInfo {
+  private QueryId queryId;
+  private Integer priority;
+  private Long startTime;
+
+  public QuerySchedulingInfo(QueryId queryId, Integer priority, Long startTime) {
+    this.queryId = queryId;
+    this.priority = priority;
+    this.startTime = startTime;
+  }
+
+  public QueryId getQueryId() {
+    return queryId;
+  }
+
+  public Integer getPriority() {
+    return priority;
+  }
+
+  public Long getStartTime() {
+    return startTime;
+  }
+
+  public String getName() {
+    return queryId.getId();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(startTime, getName(), priority);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-core/src/main/java/org/apache/tajo/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/scheduler/Scheduler.java b/tajo-core/src/main/java/org/apache/tajo/scheduler/Scheduler.java
new file mode 100644
index 0000000..d74280c
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/scheduler/Scheduler.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.scheduler;
+
+import org.apache.tajo.QueryId;
+import org.apache.tajo.master.querymaster.QueryInProgress;
+
+import java.util.List;
+
+public interface Scheduler {
+
+  public Mode getMode();
+
+  public String getName();
+
+  public boolean addQuery(QueryInProgress resource);
+
+  public boolean removeQuery(QueryId queryId);
+
+  public List<QueryInProgress> getRunningQueries();
+
+  public enum Mode {
+    FIFO
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-core/src/main/java/org/apache/tajo/scheduler/SchedulingAlgorithms.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/scheduler/SchedulingAlgorithms.java b/tajo-core/src/main/java/org/apache/tajo/scheduler/SchedulingAlgorithms.java
new file mode 100644
index 0000000..9c9b16d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/scheduler/SchedulingAlgorithms.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.scheduler;
+
+import java.util.Comparator;
+
+/**
+ * Utility class containing scheduling algorithms used in the scheduler.
+ */
+
+public class SchedulingAlgorithms  {
+  /**
+   * Compare Schedulables in order of priority and then submission time, as in
+   * the default FIFO scheduler in Tajo.
+   */
+  public static class FifoComparator implements Comparator<QuerySchedulingInfo> {
+    @Override
+    public int compare(QuerySchedulingInfo q1, QuerySchedulingInfo q2) {
+      int res = q1.getPriority().compareTo(q2.getPriority());
+      if (res == 0) {
+        res = (int) Math.signum(q1.getStartTime() - q2.getStartTime());
+      }
+      if (res == 0) {
+        // In the rare case where jobs were submitted at the exact same time,
+        // compare them by name (which will be the QueryId) to get a deterministic ordering
+        res = q1.getName().compareTo(q2.getName());
+      }
+      return res;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java b/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java
new file mode 100644
index 0000000..87968a5
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.scheduler;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.master.querymaster.QueryInProgress;
+import org.apache.tajo.master.querymaster.QueryJobManager;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SimpleFifoScheduler implements Scheduler {
+  private static final Log LOG = LogFactory.getLog(SimpleFifoScheduler.class.getName());
+  private LinkedList<QuerySchedulingInfo> pool = new LinkedList<QuerySchedulingInfo>();
+  private final Thread queryProcessor;
+  private static AtomicBoolean stopped = new AtomicBoolean();
+  private QueryJobManager manager;
+  private Comparator<QuerySchedulingInfo> COMPARATOR = new SchedulingAlgorithms.FifoComparator();
+
+  public SimpleFifoScheduler(QueryJobManager manager) {
+    this.manager = manager;
+    this.queryProcessor = new Thread(new QueryProcessor());
+    this.queryProcessor.setName("Query Processor");
+  }
+
+  @Override
+  public Mode getMode() {
+    return Mode.FIFO;
+  }
+
+  @Override
+  public String getName() {
+    return manager.getName();
+  }
+
+  @Override
+  public boolean addQuery(QueryInProgress queryInProgress) {
+    int qSize = pool.size();
+    if (qSize != 0 && qSize % 100 == 0) {
+      LOG.info("Size of Fifo queue is " + qSize);
+    }
+
+    QuerySchedulingInfo querySchedulingInfo = new QuerySchedulingInfo(queryInProgress.getQueryId(), 1, queryInProgress.getStartTime());
+    boolean result = pool.add(querySchedulingInfo);
+    if (getRunningQueries().size() == 0) wakeupProcessor();
+    return result;
+  }
+
+  @Override
+  public boolean removeQuery(QueryId queryId) {
+    return pool.remove(getQueryByQueryId(queryId));
+  }
+
+  public QuerySchedulingInfo getQueryByQueryId(QueryId queryId) {
+    for (QuerySchedulingInfo querySchedulingInfo : pool) {
+      if (querySchedulingInfo.getQueryId().equals(queryId)) {
+        return querySchedulingInfo;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public List<QueryInProgress> getRunningQueries() {
+    return new ArrayList<QueryInProgress>(manager.getRunningQueries());
+  }
+
+  public void start() {
+    queryProcessor.start();
+  }
+
+  public void stop() {
+    if (stopped.getAndSet(true)) {
+      return;
+    }
+    pool.clear();
+    synchronized (queryProcessor) {
+      queryProcessor.interrupt();
+    }
+  }
+
+  private QuerySchedulingInfo pollScheduledQuery() {
+    if (pool.size() > 1) {
+      Collections.sort(pool, COMPARATOR);
+    }
+    return pool.poll();
+  }
+
+  private void wakeupProcessor() {
+    synchronized (queryProcessor) {
+      queryProcessor.notifyAll();
+    }
+  }
+
+  private final class QueryProcessor implements Runnable {
+    @Override
+    public void run() {
+
+      QuerySchedulingInfo query;
+
+      while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+        query = null;
+        if (getRunningQueries().size() == 0) {
+          query = pollScheduledQuery();
+        }
+
+        if (query != null) {
+          try {
+            manager.startQueryJob(query.getQueryId());
+          } catch (Throwable t) {
+            LOG.fatal("Exception during query startup:", t);
+            manager.stopQuery(query.getQueryId());
+          }
+        }
+
+        synchronized (queryProcessor) {
+          try {
+            queryProcessor.wait(500);
+          } catch (InterruptedException e) {
+            if (stopped.get()) {
+              break;
+            }
+            LOG.warn("Exception during shutdown: ", e);
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-core/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp
index 6f15a0e..4e8d7b0 100644
--- a/tajo-core/src/main/resources/webapps/admin/query.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/query.jsp
@@ -26,16 +26,16 @@
 <%@ page import="org.apache.tajo.util.StringUtils" %>
 <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
 <%@ page import="java.text.SimpleDateFormat" %>
-<%@ page import="java.util.Collection" %>
-<%@ page import="java.util.HashMap" %>
-<%@ page import="java.util.List" %>
-<%@ page import="java.util.Map" %>
+<%@ page import="java.util.*" %>
 
 <%
   TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
 
   List<QueryInProgress> runningQueries =
-          JSPUtil.sortQueryInProgress(master.getContext().getQueryJobManager().getRunningQueries(), true);
+          new ArrayList<QueryInProgress>(master.getContext().getQueryJobManager().getSubmittedQueries());
+
+  runningQueries.addAll(master.getContext().getQueryJobManager().getRunningQueries());
+          JSPUtil.sortQueryInProgress(runningQueries, true);
 
   List<QueryInProgress> finishedQueries =
           JSPUtil.sortQueryInProgress(master.getContext().getQueryJobManager().getFinishedQueries(), true);

http://git-wip-us.apache.org/repos/asf/tajo/blob/4a747a0f/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java b/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java
new file mode 100644
index 0000000..76f22d0
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.scheduler;
+
+import com.google.protobuf.ServiceException;
+import org.apache.tajo.*;
+import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ClientProtos;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+
+import static org.junit.Assert.*;
+
+@Category(IntegrationTest.class)
+public class TestFifoScheduler {
+  private static TajoTestingCluster cluster;
+  private static TajoConf conf;
+  private static TajoClient client;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    cluster = TpchTestBase.getInstance().getTestingCluster();
+    conf = cluster.getConfiguration();
+    client = new TajoClient(conf);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    client.close();
+  }
+
+  @Test
+  public final void testKillScheduledQuery() throws IOException, ServiceException, InterruptedException {
+    ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(2) from lineitem");
+    ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select sleep(2) from lineitem");
+    Thread.sleep(1000);
+    QueryId queryId = new QueryId(res.getQueryId());
+    QueryId queryId2 = new QueryId(res2.getQueryId());
+    assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId2).getState());
+
+    client.killQuery(queryId2);
+    assertEquals(TajoProtos.QueryState.QUERY_KILLED, client.getQueryStatus(queryId2).getState());
+    client.killQuery(queryId);
+    assertEquals(TajoProtos.QueryState.QUERY_KILLED, client.getQueryStatus(queryId).getState());
+  }
+
+  @Test
+  public final void testForwardedQuery() throws IOException, ServiceException, InterruptedException {
+    ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(2) from lineitem");
+    ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select * from lineitem limit 1");
+
+    Thread.sleep(1000);
+    assertFalse(res2.getIsForwarded());
+    QueryId queryId2 = new QueryId(res2.getQueryId());
+    assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, client.getQueryStatus(queryId2).getState());
+    ResultSet resSet = TajoClient.createResultSet(client, res2);
+    assertNotNull(resSet);
+
+    QueryId queryId = new QueryId(res.getQueryId());
+    assertEquals(TajoProtos.QueryState.QUERY_RUNNING, client.getQueryStatus(queryId).getState());
+    client.killQuery(queryId);
+  }
+
+  @Test
+  public final void testScheduledQuery() throws IOException, ServiceException, InterruptedException {
+    ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(2) from lineitem");
+    ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select sleep(2) from lineitem");
+    ClientProtos.SubmitQueryResponse res3 = client.executeQuery("select sleep(2) from lineitem");
+    ClientProtos.SubmitQueryResponse res4 = client.executeQuery("select sleep(2) from lineitem");
+
+    Thread.sleep(1000);
+
+    QueryId queryId = new QueryId(res.getQueryId());
+    QueryId queryId2 = new QueryId(res2.getQueryId());
+    QueryId queryId3 = new QueryId(res3.getQueryId());
+    QueryId queryId4 = new QueryId(res4.getQueryId());
+    assertEquals(TajoProtos.QueryState.QUERY_RUNNING, client.getQueryStatus(queryId).getState());
+
+    assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId2).getState());
+    assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId3).getState());
+    assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId4).getState());
+
+    client.killQuery(queryId2);
+    client.killQuery(queryId3);
+    client.killQuery(queryId4);
+    client.killQuery(queryId);
+  }
+}