You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/01/08 16:36:15 UTC

[07/13] tajo git commit: TAJO-1288: Refactoring org.apache.tajo.master package.

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/TaskAttempt.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/TaskAttempt.java
deleted file mode 100644
index 0f161ff..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/TaskAttempt.java
+++ /dev/null
@@ -1,443 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.state.*;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.TajoProtos.TaskAttemptState;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
-import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
-import org.apache.tajo.master.querymaster.Task.IntermediateEntry;
-import org.apache.tajo.master.querymaster.Task.PullHost;
-import org.apache.tajo.master.container.TajoContainerId;
-
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput;
-
-public class TaskAttempt implements EventHandler<TaskAttemptEvent> {
-
-  private static final Log LOG = LogFactory.getLog(TaskAttempt.class);
-
-  private final static int EXPIRE_TIME = 15000;
-
-  private final TaskAttemptId id;
-  private final Task task;
-  final EventHandler eventHandler;
-
-  private TajoContainerId containerId;
-  private WorkerConnectionInfo workerConnectionInfo;
-  private int expire;
-
-  private final Lock readLock;
-  private final Lock writeLock;
-
-  private final List<String> diagnostics = new ArrayList<String>();
-
-  private final TaskAttemptScheduleContext scheduleContext;
-
-  private float progress;
-  private CatalogProtos.TableStatsProto inputStats;
-  private CatalogProtos.TableStatsProto resultStats;
-
-  protected static final StateMachineFactory
-      <TaskAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
-      stateMachineFactory = new StateMachineFactory
-      <TaskAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
-      (TaskAttemptState.TA_NEW)
-
-      // Transitions from TA_NEW state
-      .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
-          TaskAttemptEventType.TA_SCHEDULE, new TaskAttemptScheduleTransition())
-      .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
-          TaskAttemptEventType.TA_RESCHEDULE, new TaskAttemptScheduleTransition())
-      .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_KILLED,
-          TaskAttemptEventType.TA_KILL,
-          new TaskKilledCompleteTransition())
-
-      // Transitions from TA_UNASSIGNED state
-      .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_ASSIGNED,
-          TaskAttemptEventType.TA_ASSIGNED,
-          new LaunchTransition())
-      .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_KILL_WAIT,
-          TaskAttemptEventType.TA_KILL,
-          new KillUnassignedTaskTransition())
-
-      // Transitions from TA_ASSIGNED state
-      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_ASSIGNED,
-          TaskAttemptEventType.TA_ASSIGNED, new AlreadyAssignedTransition())
-      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILL_WAIT,
-          TaskAttemptEventType.TA_KILL,
-          new KillTaskTransition())
-      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILLED,
-          TaskAttemptEventType.TA_KILL,
-          new KillTaskTransition())
-      .addTransition(TaskAttemptState.TA_ASSIGNED,
-          EnumSet.of(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILLED),
-          TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
-      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_SUCCEEDED,
-          TaskAttemptEventType.TA_DONE, new SucceededTransition())
-      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_FAILED,
-          TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
-
-      // Transitions from TA_RUNNING state
-      .addTransition(TaskAttemptState.TA_RUNNING,
-          EnumSet.of(TaskAttemptState.TA_RUNNING),
-          TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
-      .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILL_WAIT,
-          TaskAttemptEventType.TA_KILL,
-          new KillTaskTransition())
-      .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_SUCCEEDED,
-          TaskAttemptEventType.TA_DONE, new SucceededTransition())
-      .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_FAILED,
-          TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
-
-      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
-          TaskAttemptEventType.TA_LOCAL_KILLED,
-          new TaskKilledCompleteTransition())
-      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT,
-          TaskAttemptEventType.TA_ASSIGNED,
-          new KillTaskTransition())
-      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
-          TaskAttemptEventType.TA_SCHEDULE_CANCELED,
-          new TaskKilledCompleteTransition())
-      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
-          TaskAttemptEventType.TA_DONE,
-          new TaskKilledCompleteTransition())
-      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_FAILED,
-          TaskAttemptEventType.TA_FATAL_ERROR)
-      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT,
-          EnumSet.of(
-              TaskAttemptEventType.TA_KILL,
-              TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
-              TaskAttemptEventType.TA_UPDATE))
-
-      // Transitions from TA_SUCCEEDED state
-      .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
-          TaskAttemptEventType.TA_UPDATE)
-      .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
-          TaskAttemptEventType.TA_DONE, new AlreadyDoneTransition())
-      .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_FAILED,
-          TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
-       // Ignore-able transitions
-      .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
-          TaskAttemptEventType.TA_KILL)
-
-      // Transitions from TA_KILLED state
-      .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
-          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE)
-      // Ignore-able transitions
-      .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
-          EnumSet.of(
-              TaskAttemptEventType.TA_UPDATE))
-      .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
-          EnumSet.of(
-              TaskAttemptEventType.TA_LOCAL_KILLED,
-              TaskAttemptEventType.TA_KILL,
-              TaskAttemptEventType.TA_ASSIGNED,
-              TaskAttemptEventType.TA_DONE),
-          new TaskKilledCompleteTransition())
-      .installTopology();
-
-  private final StateMachine<TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
-    stateMachine;
-
-
-  public TaskAttempt(final TaskAttemptScheduleContext scheduleContext,
-                     final TaskAttemptId id, final Task task,
-                     final EventHandler eventHandler) {
-    this.scheduleContext = scheduleContext;
-    this.id = id;
-    this.expire = TaskAttempt.EXPIRE_TIME;
-    this.task = task;
-    this.eventHandler = eventHandler;
-
-    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
-    this.readLock = readWriteLock.readLock();
-    this.writeLock = readWriteLock.writeLock();
-
-    stateMachine = stateMachineFactory.make(this);
-  }
-
-  public TaskAttemptState getState() {
-    readLock.lock();
-    try {
-      return stateMachine.getCurrentState();
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  public TaskAttemptId getId() {
-    return this.id;
-  }
-
-  public boolean isLeafTask() {
-    return this.task.isLeafTask();
-  }
-
-  public Task getTask() {
-    return this.task;
-  }
-
-  public WorkerConnectionInfo getWorkerConnectionInfo() {
-    return this.workerConnectionInfo;
-  }
-
-  public void setContainerId(TajoContainerId containerId) {
-    this.containerId = containerId;
-  }
-
-  public synchronized void setExpireTime(int expire) {
-    this.expire = expire;
-  }
-
-  public synchronized void updateExpireTime(int period) {
-    this.setExpireTime(this.expire - period);
-  }
-
-  public synchronized void resetExpireTime() {
-    this.setExpireTime(TaskAttempt.EXPIRE_TIME);
-  }
-
-  public int getLeftTime() {
-    return this.expire;
-  }
-
-  public float getProgress() {
-    return progress;
-  }
-
-  public TableStats getInputStats() {
-    if (inputStats == null) {
-      return null;
-    }
-
-    return new TableStats(inputStats);
-  }
-
-  public TableStats getResultStats() {
-    if (resultStats == null) {
-      return null;
-    }
-    return new TableStats(resultStats);
-  }
-
-  private void fillTaskStatistics(TaskCompletionReport report) {
-    this.progress = 1.0f;
-
-    List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
-
-    if (report.getShuffleFileOutputsCount() > 0) {
-      this.getTask().setShuffleFileOutputs(report.getShuffleFileOutputsList());
-
-      PullHost host = new PullHost(getWorkerConnectionInfo().getHost(), getWorkerConnectionInfo().getPullServerPort());
-      for (ShuffleFileOutput p : report.getShuffleFileOutputsList()) {
-        IntermediateEntry entry = new IntermediateEntry(getId().getTaskId().getId(),
-            getId().getId(), p.getPartId(), host, p.getVolume());
-        partitions.add(entry);
-      }
-    }
-    this.getTask().setIntermediateData(partitions);
-
-    if (report.hasInputStats()) {
-      this.inputStats = report.getInputStats();
-    }
-    if (report.hasResultStats()) {
-      this.resultStats = report.getResultStats();
-      this.getTask().setStats(new TableStats(resultStats));
-    }
-  }
-
-  private static class TaskAttemptScheduleTransition implements
-      SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
-
-    @Override
-    public void transition(TaskAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) {
-      taskAttempt.eventHandler.handle(new TaskAttemptToSchedulerEvent(
-          EventType.T_SCHEDULE, taskAttempt.getTask().getId().getExecutionBlockId(),
-          taskAttempt.scheduleContext, taskAttempt));
-    }
-  }
-
-  private static class KillUnassignedTaskTransition implements
-      SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
-
-    @Override
-    public void transition(TaskAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) {
-      taskAttempt.eventHandler.handle(new TaskAttemptToSchedulerEvent(
-          EventType.T_SCHEDULE_CANCEL, taskAttempt.getTask().getId().getExecutionBlockId(),
-          taskAttempt.scheduleContext, taskAttempt));
-    }
-  }
-
-  private static class LaunchTransition
-      implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
-
-    @Override
-    public void transition(TaskAttempt taskAttempt,
-                           TaskAttemptEvent event) {
-      TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event;
-      taskAttempt.containerId = castEvent.getContainerId();
-      taskAttempt.workerConnectionInfo = castEvent.getWorkerConnectionInfo();
-      taskAttempt.eventHandler.handle(
-          new TaskTAttemptEvent(taskAttempt.getId(),
-              TaskEventType.T_ATTEMPT_LAUNCHED));
-    }
-  }
-
-  private static class TaskKilledCompleteTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
-
-    @Override
-    public void transition(TaskAttempt taskAttempt,
-                           TaskAttemptEvent event) {
-      taskAttempt.getTask().handle(new TaskEvent(taskAttempt.getId().getTaskId(),
-          TaskEventType.T_ATTEMPT_KILLED));
-      LOG.info(taskAttempt.getId() + " Received TA_KILLED Status from LocalTask");
-    }
-  }
-
-  private static class StatusUpdateTransition
-      implements MultipleArcTransition<TaskAttempt, TaskAttemptEvent, TaskAttemptState> {
-
-    @Override
-    public TaskAttemptState transition(TaskAttempt taskAttempt,
-                                       TaskAttemptEvent event) {
-      TaskAttemptStatusUpdateEvent updateEvent = (TaskAttemptStatusUpdateEvent) event;
-
-      taskAttempt.progress = updateEvent.getStatus().getProgress();
-      taskAttempt.inputStats = updateEvent.getStatus().getInputStats();
-      taskAttempt.resultStats = updateEvent.getStatus().getResultStats();
-
-      return TaskAttemptState.TA_RUNNING;
-    }
-  }
-
-  private void addDiagnosticInfo(String diag) {
-    if (diag != null && !diag.equals("")) {
-      diagnostics.add(diag);
-    }
-  }
-
-  private static class AlreadyAssignedTransition
-      implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{
-
-    @Override
-    public void transition(TaskAttempt taskAttempt,
-                           TaskAttemptEvent taskAttemptEvent) {
-    }
-  }
-
-  private static class AlreadyDoneTransition
-      implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{
-
-    @Override
-    public void transition(TaskAttempt taskAttempt,
-                           TaskAttemptEvent taskAttemptEvent) {
-    }
-  }
-
-  private static class SucceededTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
-    @Override
-    public void transition(TaskAttempt taskAttempt,
-                           TaskAttemptEvent event) {
-      TaskCompletionReport report = ((TaskCompletionEvent)event).getReport();
-
-      try {
-        taskAttempt.fillTaskStatistics(report);
-        taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_SUCCEEDED));
-      } catch (Throwable t) {
-        taskAttempt.eventHandler.handle(new TaskFatalErrorEvent(taskAttempt.getId(), t.getMessage()));
-        taskAttempt.addDiagnosticInfo(ExceptionUtils.getStackTrace(t));
-      }
-    }
-  }
-
-  private static class KillTaskTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
-
-    @Override
-    public void transition(TaskAttempt taskAttempt, TaskAttemptEvent event) {
-      taskAttempt.eventHandler.handle(new LocalTaskEvent(taskAttempt.getId(), taskAttempt.containerId,
-          LocalTaskEventType.KILL));
-    }
-  }
-
-  private static class FailedTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{
-    @Override
-    public void transition(TaskAttempt taskAttempt, TaskAttemptEvent event) {
-      TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event;
-      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_FAILED));
-      taskAttempt.addDiagnosticInfo(errorEvent.errorMessage());
-      LOG.error(taskAttempt.getId() + " FROM " + taskAttempt.getWorkerConnectionInfo().getHost()
-          + " >> " + errorEvent.errorMessage());
-    }
-  }
-
-  @Override
-  public void handle(TaskAttemptEvent event) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing " + event.getTaskAttemptId() + " of type " + event.getType());
-    }
-    try {
-      writeLock.lock();
-      TaskAttemptState oldState = getState();
-      try {
-        stateMachine.doTransition(event.getType(), event);
-      } catch (InvalidStateTransitonException e) {
-        LOG.error("Can't handle this event at current state of " + event.getTaskAttemptId() + ")"
-            + ", eventType:" + event.getType().name()
-            + ", oldState:" + oldState.name()
-            + ", nextState:" + getState().name()
-            , e);
-        eventHandler.handle(
-            new StageDiagnosticsUpdateEvent(event.getTaskAttemptId().getTaskId().getExecutionBlockId(),
-                "Can't handle this event at current state of " + event.getTaskAttemptId() + ")"));
-        eventHandler.handle(
-            new StageEvent(event.getTaskAttemptId().getTaskId().getExecutionBlockId(),
-                StageEventType.SQ_INTERNAL_ERROR));
-      }
-
-      //notify the eventhandler of state change
-      if (LOG.isDebugEnabled()) {
-       if (oldState != getState()) {
-          LOG.debug(id + " TaskAttempt Transitioned from " + oldState + " to "
-              + getState());
-        }
-      }
-    }
-
-    finally {
-      writeLock.unlock();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index f1a9224..c4200d5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -36,10 +36,9 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.ContainerProtocol;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.master.querymaster.QueryInProgress;
+import org.apache.tajo.querymaster.QueryInProgress;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.util.ApplicationIdUtils;
-import org.apache.tajo.util.StringUtils;
 
 import java.io.IOException;
 import java.util.*;

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
index 9c2b71b..b237cc5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.service.Service;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.ipc.ContainerProtocol;
 import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.querymaster.QueryInProgress;
+import org.apache.tajo.querymaster.QueryInProgress;
 
 import java.io.IOException;
 import java.util.Collection;

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QuerySchedulingInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QuerySchedulingInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QuerySchedulingInfo.java
new file mode 100644
index 0000000..3dd3389
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QuerySchedulingInfo.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.scheduler;
+
+import com.google.common.base.Objects;
+import org.apache.tajo.QueryId;
+
+public class QuerySchedulingInfo {
+  private QueryId queryId;
+  private Integer priority;
+  private Long startTime;
+
+  public QuerySchedulingInfo(QueryId queryId, Integer priority, Long startTime) {
+    this.queryId = queryId;
+    this.priority = priority;
+    this.startTime = startTime;
+  }
+
+  public QueryId getQueryId() {
+    return queryId;
+  }
+
+  public Integer getPriority() {
+    return priority;
+  }
+
+  public Long getStartTime() {
+    return startTime;
+  }
+
+  public String getName() {
+    return queryId.getId();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(startTime, getName(), priority);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java
new file mode 100644
index 0000000..02203a9
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.scheduler;
+
+import org.apache.tajo.QueryId;
+import org.apache.tajo.querymaster.QueryInProgress;
+
+import java.util.List;
+
+public interface Scheduler {
+
+  public Mode getMode();
+
+  public String getName();
+
+  public boolean addQuery(QueryInProgress resource);
+
+  public boolean removeQuery(QueryId queryId);
+
+  public List<QueryInProgress> getRunningQueries();
+
+  public enum Mode {
+    FIFO
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SchedulingAlgorithms.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SchedulingAlgorithms.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SchedulingAlgorithms.java
new file mode 100644
index 0000000..7fd07b5
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SchedulingAlgorithms.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.scheduler;
+
+import java.util.Comparator;
+
+/**
+ * Utility class containing scheduling algorithms used in the scheduler.
+ */
+
+public class SchedulingAlgorithms  {
+  /**
+   * Compare Schedulables in order of priority and then submission time, as in
+   * the default FIFO scheduler in Tajo.
+   */
+  public static class FifoComparator implements Comparator<QuerySchedulingInfo> {
+    @Override
+    public int compare(QuerySchedulingInfo q1, QuerySchedulingInfo q2) {
+      int res = q1.getPriority().compareTo(q2.getPriority());
+      if (res == 0) {
+        res = (int) Math.signum(q1.getStartTime() - q2.getStartTime());
+      }
+      if (res == 0) {
+        // In the rare case where jobs were submitted at the exact same time,
+        // compare them by name (which will be the QueryId) to get a deterministic ordering
+        res = q1.getName().compareTo(q2.getName());
+      }
+      return res;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
new file mode 100644
index 0000000..bd8ca28
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.scheduler;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.querymaster.QueryInProgress;
+import org.apache.tajo.master.QueryJobManager;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SimpleFifoScheduler implements Scheduler {
+  private static final Log LOG = LogFactory.getLog(SimpleFifoScheduler.class.getName());
+  private LinkedList<QuerySchedulingInfo> pool = new LinkedList<QuerySchedulingInfo>();
+  private final Thread queryProcessor;
+  private AtomicBoolean stopped = new AtomicBoolean();
+  private QueryJobManager manager;
+  private Comparator<QuerySchedulingInfo> COMPARATOR = new SchedulingAlgorithms.FifoComparator();
+
+  public SimpleFifoScheduler(QueryJobManager manager) {
+    this.manager = manager;
+    this.queryProcessor = new Thread(new QueryProcessor());
+    this.queryProcessor.setName("Query Processor");
+  }
+
+  @Override
+  public Mode getMode() {
+    return Mode.FIFO;
+  }
+
+  @Override
+  public String getName() {
+    return manager.getName();
+  }
+
+  @Override
+  public boolean addQuery(QueryInProgress queryInProgress) {
+    int qSize = pool.size();
+    if (qSize != 0 && qSize % 100 == 0) {
+      LOG.info("Size of Fifo queue is " + qSize);
+    }
+
+    QuerySchedulingInfo querySchedulingInfo = new QuerySchedulingInfo(queryInProgress.getQueryId(), 1, queryInProgress.getStartTime());
+    boolean result = pool.add(querySchedulingInfo);
+    if (getRunningQueries().size() == 0) wakeupProcessor();
+    return result;
+  }
+
+  @Override
+  public boolean removeQuery(QueryId queryId) {
+    return pool.remove(getQueryByQueryId(queryId));
+  }
+
+  public QuerySchedulingInfo getQueryByQueryId(QueryId queryId) {
+    for (QuerySchedulingInfo querySchedulingInfo : pool) {
+      if (querySchedulingInfo.getQueryId().equals(queryId)) {
+        return querySchedulingInfo;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public List<QueryInProgress> getRunningQueries() {
+    return new ArrayList<QueryInProgress>(manager.getRunningQueries());
+  }
+
+  public void start() {
+    queryProcessor.start();
+  }
+
+  public void stop() {
+    if (stopped.getAndSet(true)) {
+      return;
+    }
+    pool.clear();
+    synchronized (queryProcessor) {
+      queryProcessor.interrupt();
+    }
+  }
+
+  private QuerySchedulingInfo pollScheduledQuery() {
+    if (pool.size() > 1) {
+      Collections.sort(pool, COMPARATOR);
+    }
+    return pool.poll();
+  }
+
+  private void wakeupProcessor() {
+    synchronized (queryProcessor) {
+      queryProcessor.notifyAll();
+    }
+  }
+
+  private final class QueryProcessor implements Runnable {
+    @Override
+    public void run() {
+
+      QuerySchedulingInfo query;
+
+      while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+        query = null;
+        if (getRunningQueries().size() == 0) {
+          query = pollScheduledQuery();
+        }
+
+        if (query != null) {
+          try {
+            manager.startQueryJob(query.getQueryId());
+          } catch (Throwable t) {
+            LOG.fatal("Exception during query startup:", t);
+            manager.stopQuery(query.getQueryId());
+          }
+        }
+
+        synchronized (queryProcessor) {
+          try {
+            queryProcessor.wait(500);
+          } catch (InterruptedException e) {
+            if (stopped.get()) {
+              break;
+            }
+            LOG.warn("Exception during shutdown: ", e);
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/session/InvalidSessionException.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/InvalidSessionException.java b/tajo-core/src/main/java/org/apache/tajo/master/session/InvalidSessionException.java
deleted file mode 100644
index 3f48ca5..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/session/InvalidSessionException.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.session;
-
-public class InvalidSessionException extends Exception {
-  public InvalidSessionException(String sessionId) {
-    super("Invalid session id \"" + sessionId + "\"");
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/session/NoSuchSessionVariableException.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/NoSuchSessionVariableException.java b/tajo-core/src/main/java/org/apache/tajo/master/session/NoSuchSessionVariableException.java
deleted file mode 100644
index 686d860..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/session/NoSuchSessionVariableException.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.session;
-
-public class NoSuchSessionVariableException extends Exception {
-  public NoSuchSessionVariableException(String varname) {
-    super("No such session variable \"" + varname + "\"");
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java b/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
deleted file mode 100644
index 5f44ecb..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.session;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.SessionVars;
-import org.apache.tajo.master.NonForwardQueryResultScanner;
-import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.common.ProtoObject;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.tajo.ipc.TajoWorkerProtocol.SessionProto;
-
-public class Session implements SessionConstants, ProtoObject<SessionProto>, Cloneable {
-  private static final Log LOG = LogFactory.getLog(Session.class);
-
-  private final String sessionId;
-  private final String userName;
-  private String currentDatabase;
-  private final Map<String, String> sessionVariables;
-  private final Map<QueryId, NonForwardQueryResultScanner> nonForwardQueryMap = new HashMap<QueryId, NonForwardQueryResultScanner>();
-
-  // transient status
-  private volatile long lastAccessTime;
-
-  public Session(String sessionId, String userName, String databaseName) {
-    this.sessionId = sessionId;
-    this.userName = userName;
-    this.currentDatabase = databaseName;
-    this.lastAccessTime = System.currentTimeMillis();
-
-    this.sessionVariables = new HashMap<String, String>();
-    sessionVariables.put(SessionVars.SESSION_ID.keyname(), sessionId);
-    sessionVariables.put(SessionVars.USERNAME.keyname(), userName);
-    selectDatabase(databaseName);
-  }
-
-  public Session(SessionProto proto) {
-    sessionId = proto.getSessionId();
-    userName = proto.getUsername();
-    currentDatabase = proto.getCurrentDatabase();
-    lastAccessTime = proto.getLastAccessTime();
-    KeyValueSet keyValueSet = new KeyValueSet(proto.getVariables());
-    sessionVariables = keyValueSet.getAllKeyValus();
-  }
-
-  public String getSessionId() {
-    return sessionId;
-  }
-
-  public String getUserName() {
-    return userName;
-  }
-
-  public void updateLastAccessTime() {
-    lastAccessTime = System.currentTimeMillis();
-  }
-
-  public long getLastAccessTime() {
-    return lastAccessTime;
-  }
-
-  public void setVariable(String name, String value) {
-    synchronized (sessionVariables) {
-      sessionVariables.put(SessionVars.handleDeprecatedName(name), value);
-    }
-  }
-
-  public String getVariable(String name) throws NoSuchSessionVariableException {
-    synchronized (sessionVariables) {
-      if (sessionVariables.containsKey(name)) {
-        return sessionVariables.get(SessionVars.handleDeprecatedName(name));
-      } else {
-        throw new NoSuchSessionVariableException(name);
-      }
-    }
-  }
-
-  public void removeVariable(String name) {
-    synchronized (sessionVariables) {
-      sessionVariables.remove(SessionVars.handleDeprecatedName(name));
-    }
-  }
-
-  public synchronized Map<String, String> getAllVariables() {
-    synchronized (sessionVariables) {
-      sessionVariables.put(SessionVars.SESSION_ID.keyname(), sessionId);
-      sessionVariables.put(SessionVars.USERNAME.keyname(), userName);
-      sessionVariables.put(SessionVars.SESSION_LAST_ACCESS_TIME.keyname(), String.valueOf(lastAccessTime));
-      sessionVariables.put(SessionVars.CURRENT_DATABASE.keyname(), currentDatabase);
-      return ImmutableMap.copyOf(sessionVariables);
-    }
-  }
-
-  public synchronized void selectDatabase(String databaseName) {
-    this.currentDatabase = databaseName;
-  }
-
-  public synchronized String getCurrentDatabase() {
-    return currentDatabase;
-  }
-
-  @Override
-  public SessionProto getProto() {
-    SessionProto.Builder builder = SessionProto.newBuilder();
-    builder.setSessionId(getSessionId());
-    builder.setUsername(getUserName());
-    builder.setCurrentDatabase(getCurrentDatabase());
-    builder.setLastAccessTime(lastAccessTime);
-    KeyValueSet variables = new KeyValueSet();
-
-    synchronized (sessionVariables) {
-      variables.putAll(this.sessionVariables);
-      builder.setVariables(variables.getProto());
-      return builder.build();
-    }
-  }
-
-  public String toString() {
-    return "user=" + getUserName() + ",id=" + getSessionId() +",last_atime=" + getLastAccessTime();
-  }
-
-  public Session clone() throws CloneNotSupportedException {
-    Session newSession = (Session) super.clone();
-    newSession.sessionVariables.putAll(getAllVariables());
-    return newSession;
-  }
-
-  public NonForwardQueryResultScanner getNonForwardQueryResultScanner(QueryId queryId) {
-    synchronized (nonForwardQueryMap) {
-      return nonForwardQueryMap.get(queryId);
-    }
-  }
-
-  public void addNonForwardQueryResultScanner(NonForwardQueryResultScanner resultScanner) {
-    synchronized (nonForwardQueryMap) {
-      nonForwardQueryMap.put(resultScanner.getQueryId(), resultScanner);
-    }
-  }
-
-  public void closeNonForwardQueryResultScanner(QueryId queryId) {
-    NonForwardQueryResultScanner resultScanner;
-    synchronized (nonForwardQueryMap) {
-      resultScanner = nonForwardQueryMap.remove(queryId);
-    }
-
-    if (resultScanner != null) {
-      try {
-        resultScanner.close();
-      } catch (Exception e) {
-        LOG.error("NonForwardQueryResultScanne close error: " + e.getMessage(), e);
-      }
-    }
-  }
-
-  public void close() {
-    try {
-      synchronized (nonForwardQueryMap) {
-        for (NonForwardQueryResultScanner eachQueryScanner: nonForwardQueryMap.values()) {
-          try {
-            eachQueryScanner.close();
-          } catch (Exception e) {
-            LOG.error("Error while closing NonForwardQueryResultScanner: " +
-                eachQueryScanner.getSessionId() + ", " + e.getMessage(), e);
-          }
-        }
-
-        nonForwardQueryMap.clear();
-      }
-    } catch (Throwable t) {
-      LOG.error(t.getMessage(), t);
-      throw new RuntimeException(t.getMessage(), t);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/session/SessionConstants.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionConstants.java b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionConstants.java
deleted file mode 100644
index 46f49a2..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionConstants.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.session;
-
-public interface SessionConstants {
-
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEvent.java
deleted file mode 100644
index dce3ba6..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEvent.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.session;
-
-import org.apache.hadoop.yarn.event.AbstractEvent;
-
-public class SessionEvent extends AbstractEvent<SessionEventType> {
-  private final String sessionId;
-
-  public SessionEvent(String sessionId, SessionEventType sessionEventType) {
-    super(sessionEventType);
-    this.sessionId = sessionId;
-  }
-
-  public String getSessionId() {
-    return sessionId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEventType.java
deleted file mode 100644
index 64c6fc6..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEventType.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.session;
-
-public enum SessionEventType {
-  EXPIRE,
-  PING
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/session/SessionLivelinessMonitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionLivelinessMonitor.java b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionLivelinessMonitor.java
deleted file mode 100644
index 912f769..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionLivelinessMonitor.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.session;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
-import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tajo.conf.TajoConf;
-
-public class SessionLivelinessMonitor extends AbstractLivelinessMonitor<String> {
-
-  private EventHandler dispatcher;
-
-  public SessionLivelinessMonitor(Dispatcher d) {
-    super(SessionLivelinessMonitor.class.getSimpleName(), new SystemClock());
-    this.dispatcher = d.getEventHandler();
-  }
-
-  public void serviceInit(Configuration conf) throws Exception {
-    Preconditions.checkArgument(conf instanceof TajoConf);
-    TajoConf systemConf = (TajoConf) conf;
-
-    // seconds
-    int expireIntvl = systemConf.getIntVar(TajoConf.ConfVars.$CLIENT_SESSION_EXPIRY_TIME);
-    setExpireInterval(expireIntvl);
-    setMonitorInterval(expireIntvl / 3);
-    super.serviceInit(conf);
-  }
-
-  @Override
-  protected void expire(String id) {
-    dispatcher.handle(new SessionEvent(id, SessionEventType.EXPIRE));
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java
deleted file mode 100644
index d701d03..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.session;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class SessionManager extends CompositeService implements EventHandler<SessionEvent> {
-  private static final Log LOG = LogFactory.getLog(SessionManager.class);
-
-  public final ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<String, Session>();
-  private final Dispatcher dispatcher;
-  private SessionLivelinessMonitor sessionLivelinessMonitor;
-
-
-  public SessionManager(Dispatcher dispatcher) {
-    super(SessionManager.class.getSimpleName());
-    this.dispatcher = dispatcher;
-  }
-
-  @Override
-  public void serviceInit(Configuration conf) throws Exception {
-    sessionLivelinessMonitor = new SessionLivelinessMonitor(dispatcher);
-    addIfService(sessionLivelinessMonitor);
-    super.serviceInit(conf);
-  }
-
-  @Override
-  public void serviceStop() throws Exception {
-    super.serviceStop();
-  }
-
-  private void assertSessionExistence(String sessionId) throws InvalidSessionException {
-    if (!sessions.containsKey(sessionId)) {
-      throw new InvalidSessionException(sessionId);
-    }
-  }
-
-  public String createSession(String username, String baseDatabaseName) throws InvalidSessionException {
-    String sessionId;
-    Session oldSession;
-
-    sessionId = UUID.randomUUID().toString();
-    Session newSession = new Session(sessionId, username, baseDatabaseName);
-    oldSession = sessions.putIfAbsent(sessionId, newSession);
-    if (oldSession != null) {
-      throw new InvalidSessionException("Session id is duplicated: " + oldSession.getSessionId());
-    }
-    LOG.info("Session " + sessionId + " is created." );
-    return sessionId;
-  }
-
-  public Session removeSession(String sessionId) {
-    if (sessions.containsKey(sessionId)) {
-      LOG.info("Session " + sessionId + " is removed.");
-      Session session = sessions.remove(sessionId);
-      session.close();
-      return session;
-    } else {
-      LOG.error("No such session id: " + sessionId);
-      return null;
-    }
-  }
-
-  public Session getSession(String sessionId) throws InvalidSessionException {
-    assertSessionExistence(sessionId);
-    touch(sessionId);
-    return sessions.get(sessionId);
-  }
-
-  public void setVariable(String sessionId, String name, String value) throws InvalidSessionException {
-    assertSessionExistence(sessionId);
-    touch(sessionId);
-    sessions.get(sessionId).setVariable(name, value);
-  }
-
-  public String getVariable(String sessionId, String name)
-      throws InvalidSessionException, NoSuchSessionVariableException {
-    assertSessionExistence(sessionId);
-    touch(sessionId);
-    return sessions.get(sessionId).getVariable(name);
-  }
-
-  public void removeVariable(String sessionId, String name) throws InvalidSessionException {
-    assertSessionExistence(sessionId);
-    touch(sessionId);
-    sessions.get(sessionId).removeVariable(name);
-  }
-
-  public Map<String, String> getAllVariables(String sessionId) throws InvalidSessionException {
-    assertSessionExistence(sessionId);
-    touch(sessionId);
-    return sessions.get(sessionId).getAllVariables();
-  }
-
-  public void touch(String sessionId) throws InvalidSessionException {
-    assertSessionExistence(sessionId);
-    sessions.get(sessionId).updateLastAccessTime();
-    sessionLivelinessMonitor.receivedPing(sessionId);
-  }
-
-  @Override
-  public void handle(SessionEvent event) {
-    LOG.info("Processing " + event.getSessionId() + " of type " + event.getType());
-
-    try {
-      assertSessionExistence(event.getSessionId());
-      touch(event.getSessionId());
-    } catch (InvalidSessionException e) {
-      LOG.error(e);
-    }
-
-    if (event.getType() == SessionEventType.EXPIRE) {
-      Session session = removeSession(event.getSessionId());
-      if (session != null) {
-        LOG.info("[Expired] Session username=" + session.getUserName() + ",sessionid=" + event.getSessionId());
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/metrics/CatalogMetricsGaugeSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/metrics/CatalogMetricsGaugeSet.java b/tajo-core/src/main/java/org/apache/tajo/metrics/CatalogMetricsGaugeSet.java
new file mode 100644
index 0000000..82ebe29
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/metrics/CatalogMetricsGaugeSet.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.metrics;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
+import org.apache.tajo.master.TajoMaster;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+
+public class CatalogMetricsGaugeSet implements MetricSet {
+  TajoMaster.MasterContext tajoMasterContext;
+  public CatalogMetricsGaugeSet(TajoMaster.MasterContext tajoMasterContext) {
+    this.tajoMasterContext = tajoMasterContext;
+  }
+
+  @Override
+  public Map<String, Metric> getMetrics() {
+    Map<String, Metric> metricsMap = new HashMap<String, Metric>();
+    metricsMap.put("numTables", new Gauge<Integer>() {
+      @Override
+      public Integer getValue() {
+        return tajoMasterContext.getCatalog().getAllTableNames(DEFAULT_DATABASE_NAME).size();
+      }
+    });
+
+    metricsMap.put("numFunctions", new Gauge<Integer>() {
+      @Override
+      public Integer getValue() {
+        return tajoMasterContext.getCatalog().getFunctions().size();
+      }
+    });
+
+    return metricsMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/metrics/WorkerResourceMetricsGaugeSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/metrics/WorkerResourceMetricsGaugeSet.java b/tajo-core/src/main/java/org/apache/tajo/metrics/WorkerResourceMetricsGaugeSet.java
new file mode 100644
index 0000000..229a80a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/metrics/WorkerResourceMetricsGaugeSet.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.metrics;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.rm.Worker;
+import org.apache.tajo.master.rm.WorkerState;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class WorkerResourceMetricsGaugeSet implements MetricSet {
+  TajoMaster.MasterContext tajoMasterContext;
+  public WorkerResourceMetricsGaugeSet(TajoMaster.MasterContext tajoMasterContext) {
+    this.tajoMasterContext = tajoMasterContext;
+  }
+
+  @Override
+  public Map<String, Metric> getMetrics() {
+    Map<String, Metric> metricsMap = new HashMap<String, Metric>();
+    metricsMap.put("totalWorkers", new Gauge<Integer>() {
+      @Override
+      public Integer getValue() {
+        return tajoMasterContext.getResourceManager().getWorkers().size();
+      }
+    });
+
+    metricsMap.put("liveWorkers", new Gauge<Integer>() {
+      @Override
+      public Integer getValue() {
+        return getNumWorkers(WorkerState.RUNNING);
+      }
+    });
+
+    metricsMap.put("deadWorkers", new Gauge<Integer>() {
+      @Override
+      public Integer getValue() {
+        return getNumWorkers(WorkerState.LOST);
+      }
+    });
+
+    return metricsMap;
+  }
+
+  protected int getNumWorkers(WorkerState status) {
+    int numWorkers = 0;
+    for(Worker eachWorker: tajoMasterContext.getResourceManager().getWorkers().values()) {
+      if(eachWorker.getState() == status) {
+        numWorkers++;
+      }
+    }
+
+    return numWorkers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java
new file mode 100644
index 0000000..e45f274
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.querymaster;
+
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.master.event.TaskRequestEvent;
+import org.apache.tajo.master.event.TaskSchedulerEvent;
+
+
+public abstract class AbstractTaskScheduler extends AbstractService implements EventHandler<TaskSchedulerEvent> {
+
+  protected int hostLocalAssigned;
+  protected int rackLocalAssigned;
+  protected int totalAssigned;
+
+  /**
+   * Construct the service.
+   *
+   * @param name service name
+   */
+  public AbstractTaskScheduler(String name) {
+    super(name);
+  }
+
+  public int getHostLocalAssigned() {
+    return hostLocalAssigned;
+  }
+
+  public int getRackLocalAssigned() {
+    return rackLocalAssigned;
+  }
+
+  public int getTotalAssigned() {
+    return totalAssigned;
+  }
+
+  public abstract void handleTaskRequestEvent(TaskRequestEvent event);
+  public abstract int remainingScheduledObjectNum();
+}