You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/01/08 16:36:10 UTC

[02/13] tajo git commit: TAJO-1288: Refactoring org.apache.tajo.master package.

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
new file mode 100644
index 0000000..86c49b4
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
@@ -0,0 +1,443 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.querymaster;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
+import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
+import org.apache.tajo.querymaster.Task.IntermediateEntry;
+import org.apache.tajo.querymaster.Task.PullHost;
+import org.apache.tajo.master.container.TajoContainerId;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput;
+
+public class TaskAttempt implements EventHandler<TaskAttemptEvent> {
+
+  private static final Log LOG = LogFactory.getLog(TaskAttempt.class);
+
+  private final static int EXPIRE_TIME = 15000;
+
+  private final TaskAttemptId id;
+  private final Task task;
+  final EventHandler eventHandler;
+
+  private TajoContainerId containerId;
+  private WorkerConnectionInfo workerConnectionInfo;
+  private int expire;
+
+  private final Lock readLock;
+  private final Lock writeLock;
+
+  private final List<String> diagnostics = new ArrayList<String>();
+
+  private final TaskAttemptScheduleContext scheduleContext;
+
+  private float progress;
+  private CatalogProtos.TableStatsProto inputStats;
+  private CatalogProtos.TableStatsProto resultStats;
+
+  protected static final StateMachineFactory
+      <TaskAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+      stateMachineFactory = new StateMachineFactory
+      <TaskAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+      (TaskAttemptState.TA_NEW)
+
+      // Transitions from TA_NEW state
+      .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
+          TaskAttemptEventType.TA_SCHEDULE, new TaskAttemptScheduleTransition())
+      .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
+          TaskAttemptEventType.TA_RESCHEDULE, new TaskAttemptScheduleTransition())
+      .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_KILL,
+          new TaskKilledCompleteTransition())
+
+      // Transitions from TA_UNASSIGNED state
+      .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_ASSIGNED,
+          TaskAttemptEventType.TA_ASSIGNED,
+          new LaunchTransition())
+      .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_KILL_WAIT,
+          TaskAttemptEventType.TA_KILL,
+          new KillUnassignedTaskTransition())
+
+      // Transitions from TA_ASSIGNED state
+      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_ASSIGNED,
+          TaskAttemptEventType.TA_ASSIGNED, new AlreadyAssignedTransition())
+      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILL_WAIT,
+          TaskAttemptEventType.TA_KILL,
+          new KillTaskTransition())
+      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_KILL,
+          new KillTaskTransition())
+      .addTransition(TaskAttemptState.TA_ASSIGNED,
+          EnumSet.of(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILLED),
+          TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
+      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_SUCCEEDED,
+          TaskAttemptEventType.TA_DONE, new SucceededTransition())
+      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_FAILED,
+          TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
+
+      // Transitions from TA_RUNNING state
+      .addTransition(TaskAttemptState.TA_RUNNING,
+          EnumSet.of(TaskAttemptState.TA_RUNNING),
+          TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
+      .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILL_WAIT,
+          TaskAttemptEventType.TA_KILL,
+          new KillTaskTransition())
+      .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_SUCCEEDED,
+          TaskAttemptEventType.TA_DONE, new SucceededTransition())
+      .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_FAILED,
+          TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
+
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_LOCAL_KILLED,
+          new TaskKilledCompleteTransition())
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT,
+          TaskAttemptEventType.TA_ASSIGNED,
+          new KillTaskTransition())
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_SCHEDULE_CANCELED,
+          new TaskKilledCompleteTransition())
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_DONE,
+          new TaskKilledCompleteTransition())
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_FAILED,
+          TaskAttemptEventType.TA_FATAL_ERROR)
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT,
+          EnumSet.of(
+              TaskAttemptEventType.TA_KILL,
+              TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+              TaskAttemptEventType.TA_UPDATE))
+
+      // Transitions from TA_SUCCEEDED state
+      .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
+          TaskAttemptEventType.TA_UPDATE)
+      .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
+          TaskAttemptEventType.TA_DONE, new AlreadyDoneTransition())
+      .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_FAILED,
+          TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
+       // Ignore-able transitions
+      .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
+          TaskAttemptEventType.TA_KILL)
+
+      // Transitions from TA_KILLED state
+      .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE)
+      // Ignore-able transitions
+      .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
+          EnumSet.of(
+              TaskAttemptEventType.TA_UPDATE))
+      .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
+          EnumSet.of(
+              TaskAttemptEventType.TA_LOCAL_KILLED,
+              TaskAttemptEventType.TA_KILL,
+              TaskAttemptEventType.TA_ASSIGNED,
+              TaskAttemptEventType.TA_DONE),
+          new TaskKilledCompleteTransition())
+      .installTopology();
+
+  private final StateMachine<TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+    stateMachine;
+
+
+  public TaskAttempt(final TaskAttemptScheduleContext scheduleContext,
+                     final TaskAttemptId id, final Task task,
+                     final EventHandler eventHandler) {
+    this.scheduleContext = scheduleContext;
+    this.id = id;
+    this.expire = TaskAttempt.EXPIRE_TIME;
+    this.task = task;
+    this.eventHandler = eventHandler;
+
+    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    this.readLock = readWriteLock.readLock();
+    this.writeLock = readWriteLock.writeLock();
+
+    stateMachine = stateMachineFactory.make(this);
+  }
+
+  public TaskAttemptState getState() {
+    readLock.lock();
+    try {
+      return stateMachine.getCurrentState();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public TaskAttemptId getId() {
+    return this.id;
+  }
+
+  public boolean isLeafTask() {
+    return this.task.isLeafTask();
+  }
+
+  public Task getTask() {
+    return this.task;
+  }
+
+  public WorkerConnectionInfo getWorkerConnectionInfo() {
+    return this.workerConnectionInfo;
+  }
+
+  public void setContainerId(TajoContainerId containerId) {
+    this.containerId = containerId;
+  }
+
+  public synchronized void setExpireTime(int expire) {
+    this.expire = expire;
+  }
+
+  public synchronized void updateExpireTime(int period) {
+    this.setExpireTime(this.expire - period);
+  }
+
+  public synchronized void resetExpireTime() {
+    this.setExpireTime(TaskAttempt.EXPIRE_TIME);
+  }
+
+  public int getLeftTime() {
+    return this.expire;
+  }
+
+  public float getProgress() {
+    return progress;
+  }
+
+  public TableStats getInputStats() {
+    if (inputStats == null) {
+      return null;
+    }
+
+    return new TableStats(inputStats);
+  }
+
+  public TableStats getResultStats() {
+    if (resultStats == null) {
+      return null;
+    }
+    return new TableStats(resultStats);
+  }
+
+  private void fillTaskStatistics(TaskCompletionReport report) {
+    this.progress = 1.0f;
+
+    List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
+
+    if (report.getShuffleFileOutputsCount() > 0) {
+      this.getTask().setShuffleFileOutputs(report.getShuffleFileOutputsList());
+
+      PullHost host = new PullHost(getWorkerConnectionInfo().getHost(), getWorkerConnectionInfo().getPullServerPort());
+      for (ShuffleFileOutput p : report.getShuffleFileOutputsList()) {
+        IntermediateEntry entry = new IntermediateEntry(getId().getTaskId().getId(),
+            getId().getId(), p.getPartId(), host, p.getVolume());
+        partitions.add(entry);
+      }
+    }
+    this.getTask().setIntermediateData(partitions);
+
+    if (report.hasInputStats()) {
+      this.inputStats = report.getInputStats();
+    }
+    if (report.hasResultStats()) {
+      this.resultStats = report.getResultStats();
+      this.getTask().setStats(new TableStats(resultStats));
+    }
+  }
+
+  private static class TaskAttemptScheduleTransition implements
+      SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
+
+    @Override
+    public void transition(TaskAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) {
+      taskAttempt.eventHandler.handle(new TaskAttemptToSchedulerEvent(
+          EventType.T_SCHEDULE, taskAttempt.getTask().getId().getExecutionBlockId(),
+          taskAttempt.scheduleContext, taskAttempt));
+    }
+  }
+
+  private static class KillUnassignedTaskTransition implements
+      SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
+
+    @Override
+    public void transition(TaskAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) {
+      taskAttempt.eventHandler.handle(new TaskAttemptToSchedulerEvent(
+          EventType.T_SCHEDULE_CANCEL, taskAttempt.getTask().getId().getExecutionBlockId(),
+          taskAttempt.scheduleContext, taskAttempt));
+    }
+  }
+
+  private static class LaunchTransition
+      implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
+
+    @Override
+    public void transition(TaskAttempt taskAttempt,
+                           TaskAttemptEvent event) {
+      TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event;
+      taskAttempt.containerId = castEvent.getContainerId();
+      taskAttempt.workerConnectionInfo = castEvent.getWorkerConnectionInfo();
+      taskAttempt.eventHandler.handle(
+          new TaskTAttemptEvent(taskAttempt.getId(),
+              TaskEventType.T_ATTEMPT_LAUNCHED));
+    }
+  }
+
+  private static class TaskKilledCompleteTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
+
+    @Override
+    public void transition(TaskAttempt taskAttempt,
+                           TaskAttemptEvent event) {
+      taskAttempt.getTask().handle(new TaskEvent(taskAttempt.getId().getTaskId(),
+          TaskEventType.T_ATTEMPT_KILLED));
+      LOG.info(taskAttempt.getId() + " Received TA_KILLED Status from LocalTask");
+    }
+  }
+
+  private static class StatusUpdateTransition
+      implements MultipleArcTransition<TaskAttempt, TaskAttemptEvent, TaskAttemptState> {
+
+    @Override
+    public TaskAttemptState transition(TaskAttempt taskAttempt,
+                                       TaskAttemptEvent event) {
+      TaskAttemptStatusUpdateEvent updateEvent = (TaskAttemptStatusUpdateEvent) event;
+
+      taskAttempt.progress = updateEvent.getStatus().getProgress();
+      taskAttempt.inputStats = updateEvent.getStatus().getInputStats();
+      taskAttempt.resultStats = updateEvent.getStatus().getResultStats();
+
+      return TaskAttemptState.TA_RUNNING;
+    }
+  }
+
+  private void addDiagnosticInfo(String diag) {
+    if (diag != null && !diag.equals("")) {
+      diagnostics.add(diag);
+    }
+  }
+
+  private static class AlreadyAssignedTransition
+      implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{
+
+    @Override
+    public void transition(TaskAttempt taskAttempt,
+                           TaskAttemptEvent taskAttemptEvent) {
+    }
+  }
+
+  private static class AlreadyDoneTransition
+      implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{
+
+    @Override
+    public void transition(TaskAttempt taskAttempt,
+                           TaskAttemptEvent taskAttemptEvent) {
+    }
+  }
+
+  private static class SucceededTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
+    @Override
+    public void transition(TaskAttempt taskAttempt,
+                           TaskAttemptEvent event) {
+      TaskCompletionReport report = ((TaskCompletionEvent)event).getReport();
+
+      try {
+        taskAttempt.fillTaskStatistics(report);
+        taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_SUCCEEDED));
+      } catch (Throwable t) {
+        taskAttempt.eventHandler.handle(new TaskFatalErrorEvent(taskAttempt.getId(), t.getMessage()));
+        taskAttempt.addDiagnosticInfo(ExceptionUtils.getStackTrace(t));
+      }
+    }
+  }
+
+  private static class KillTaskTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
+
+    @Override
+    public void transition(TaskAttempt taskAttempt, TaskAttemptEvent event) {
+      taskAttempt.eventHandler.handle(new LocalTaskEvent(taskAttempt.getId(), taskAttempt.containerId,
+          LocalTaskEventType.KILL));
+    }
+  }
+
+  private static class FailedTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{
+    @Override
+    public void transition(TaskAttempt taskAttempt, TaskAttemptEvent event) {
+      TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event;
+      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_FAILED));
+      taskAttempt.addDiagnosticInfo(errorEvent.errorMessage());
+      LOG.error(taskAttempt.getId() + " FROM " + taskAttempt.getWorkerConnectionInfo().getHost()
+          + " >> " + errorEvent.errorMessage());
+    }
+  }
+
+  @Override
+  public void handle(TaskAttemptEvent event) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing " + event.getTaskAttemptId() + " of type " + event.getType());
+    }
+    try {
+      writeLock.lock();
+      TaskAttemptState oldState = getState();
+      try {
+        stateMachine.doTransition(event.getType(), event);
+      } catch (InvalidStateTransitonException e) {
+        LOG.error("Can't handle this event at current state of " + event.getTaskAttemptId() + ")"
+            + ", eventType:" + event.getType().name()
+            + ", oldState:" + oldState.name()
+            + ", nextState:" + getState().name()
+            , e);
+        eventHandler.handle(
+            new StageDiagnosticsUpdateEvent(event.getTaskAttemptId().getTaskId().getExecutionBlockId(),
+                "Can't handle this event at current state of " + event.getTaskAttemptId() + ")"));
+        eventHandler.handle(
+            new StageEvent(event.getTaskAttemptId().getTaskId().getExecutionBlockId(),
+                StageEventType.SQ_INTERNAL_ERROR));
+      }
+
+      //notify the eventhandler of state change
+      if (LOG.isDebugEnabled()) {
+       if (oldState != getState()) {
+          LOG.debug(id + " TaskAttempt Transitioned from " + oldState + " to "
+              + getState());
+        }
+      }
+    }
+
+    finally {
+      writeLock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskSchedulerContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskSchedulerContext.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskSchedulerContext.java
new file mode 100644
index 0000000..b699674
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskSchedulerContext.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.querymaster;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.querymaster.QueryMasterTask;
+
+public class TaskSchedulerContext {
+  private QueryMasterTask.QueryMasterTaskContext masterContext;
+  private boolean isLeafQuery;
+  private ExecutionBlockId blockId;
+  private int taskSize;
+  private int estimatedTaskNum;
+
+  public TaskSchedulerContext(QueryMasterTask.QueryMasterTaskContext masterContext, boolean isLeafQuery,
+                              ExecutionBlockId blockId) {
+    this.masterContext = masterContext;
+    this.isLeafQuery = isLeafQuery;
+    this.blockId = blockId;
+  }
+
+  public QueryMasterTask.QueryMasterTaskContext getMasterContext() {
+    return masterContext;
+  }
+
+  public boolean isLeafQuery() {
+    return isLeafQuery;
+  }
+
+  public ExecutionBlockId getBlockId() {
+    return blockId;
+  }
+
+  public int getTaskSize() {
+    return taskSize;
+  }
+
+  public int getEstimatedTaskNum() {
+    return estimatedTaskNum;
+  }
+
+  public void setTaskSize(int taskSize) {
+    this.taskSize = taskSize;
+  }
+
+  public void setEstimatedTaskNum(int estimatedTaskNum) {
+    this.estimatedTaskNum = estimatedTaskNum;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskSchedulerFactory.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskSchedulerFactory.java
new file mode 100644
index 0000000..2794771
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskSchedulerFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.querymaster;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+public class TaskSchedulerFactory {
+  private static Class<? extends AbstractTaskScheduler> CACHED_ALGORITHM_CLASS;
+  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
+  private static final Class<?>[] DEFAULT_PARAMS = { TaskSchedulerContext.class, Stage.class };
+
+  public static Class<? extends AbstractTaskScheduler> getTaskSchedulerClass(Configuration conf)
+      throws IOException {
+    if (CACHED_ALGORITHM_CLASS != null) {
+      return CACHED_ALGORITHM_CLASS;
+    } else {
+      CACHED_ALGORITHM_CLASS = conf.getClass("tajo.querymaster.task-scheduler", null, AbstractTaskScheduler.class);
+    }
+
+    if (CACHED_ALGORITHM_CLASS == null) {
+      throw new IOException("Task scheduler is null");
+    }
+    return CACHED_ALGORITHM_CLASS;
+  }
+
+  public static <T extends AbstractTaskScheduler> T get(Class<T> clazz, TaskSchedulerContext context,
+                                                        Stage stage) {
+    T result;
+    try {
+      Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz);
+      if (constructor == null) {
+        constructor = clazz.getDeclaredConstructor(DEFAULT_PARAMS);
+        constructor.setAccessible(true);
+        CONSTRUCTOR_CACHE.put(clazz, constructor);
+      }
+      result = constructor.newInstance(new Object[]{context, stage});
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return result;
+  }
+
+  public static AbstractTaskScheduler get(Configuration conf, TaskSchedulerContext context, Stage stage)
+      throws IOException {
+    return get(getTaskSchedulerClass(conf), context, stage);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/scheduler/QuerySchedulingInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/scheduler/QuerySchedulingInfo.java b/tajo-core/src/main/java/org/apache/tajo/scheduler/QuerySchedulingInfo.java
deleted file mode 100644
index d9932bd..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/scheduler/QuerySchedulingInfo.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.scheduler;
-
-import com.google.common.base.Objects;
-import org.apache.tajo.QueryId;
-
-public class QuerySchedulingInfo {
-  private QueryId queryId;
-  private Integer priority;
-  private Long startTime;
-
-  public QuerySchedulingInfo(QueryId queryId, Integer priority, Long startTime) {
-    this.queryId = queryId;
-    this.priority = priority;
-    this.startTime = startTime;
-  }
-
-  public QueryId getQueryId() {
-    return queryId;
-  }
-
-  public Integer getPriority() {
-    return priority;
-  }
-
-  public Long getStartTime() {
-    return startTime;
-  }
-
-  public String getName() {
-    return queryId.getId();
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(startTime, getName(), priority);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/scheduler/Scheduler.java b/tajo-core/src/main/java/org/apache/tajo/scheduler/Scheduler.java
deleted file mode 100644
index d74280c..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/scheduler/Scheduler.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.scheduler;
-
-import org.apache.tajo.QueryId;
-import org.apache.tajo.master.querymaster.QueryInProgress;
-
-import java.util.List;
-
-public interface Scheduler {
-
-  public Mode getMode();
-
-  public String getName();
-
-  public boolean addQuery(QueryInProgress resource);
-
-  public boolean removeQuery(QueryId queryId);
-
-  public List<QueryInProgress> getRunningQueries();
-
-  public enum Mode {
-    FIFO
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/scheduler/SchedulingAlgorithms.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/scheduler/SchedulingAlgorithms.java b/tajo-core/src/main/java/org/apache/tajo/scheduler/SchedulingAlgorithms.java
deleted file mode 100644
index 9c9b16d..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/scheduler/SchedulingAlgorithms.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.scheduler;
-
-import java.util.Comparator;
-
-/**
- * Utility class containing scheduling algorithms used in the scheduler.
- */
-
-public class SchedulingAlgorithms  {
-  /**
-   * Compare Schedulables in order of priority and then submission time, as in
-   * the default FIFO scheduler in Tajo.
-   */
-  public static class FifoComparator implements Comparator<QuerySchedulingInfo> {
-    @Override
-    public int compare(QuerySchedulingInfo q1, QuerySchedulingInfo q2) {
-      int res = q1.getPriority().compareTo(q2.getPriority());
-      if (res == 0) {
-        res = (int) Math.signum(q1.getStartTime() - q2.getStartTime());
-      }
-      if (res == 0) {
-        // In the rare case where jobs were submitted at the exact same time,
-        // compare them by name (which will be the QueryId) to get a deterministic ordering
-        res = q1.getName().compareTo(q2.getName());
-      }
-      return res;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java b/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java
deleted file mode 100644
index a74e606..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.scheduler;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.master.querymaster.QueryInProgress;
-import org.apache.tajo.master.querymaster.QueryJobManager;
-
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class SimpleFifoScheduler implements Scheduler {
-  private static final Log LOG = LogFactory.getLog(SimpleFifoScheduler.class.getName());
-  private LinkedList<QuerySchedulingInfo> pool = new LinkedList<QuerySchedulingInfo>();
-  private final Thread queryProcessor;
-  private AtomicBoolean stopped = new AtomicBoolean();
-  private QueryJobManager manager;
-  private Comparator<QuerySchedulingInfo> COMPARATOR = new SchedulingAlgorithms.FifoComparator();
-
-  public SimpleFifoScheduler(QueryJobManager manager) {
-    this.manager = manager;
-    this.queryProcessor = new Thread(new QueryProcessor());
-    this.queryProcessor.setName("Query Processor");
-  }
-
-  @Override
-  public Mode getMode() {
-    return Mode.FIFO;
-  }
-
-  @Override
-  public String getName() {
-    return manager.getName();
-  }
-
-  @Override
-  public boolean addQuery(QueryInProgress queryInProgress) {
-    int qSize = pool.size();
-    if (qSize != 0 && qSize % 100 == 0) {
-      LOG.info("Size of Fifo queue is " + qSize);
-    }
-
-    QuerySchedulingInfo querySchedulingInfo = new QuerySchedulingInfo(queryInProgress.getQueryId(), 1, queryInProgress.getStartTime());
-    boolean result = pool.add(querySchedulingInfo);
-    if (getRunningQueries().size() == 0) wakeupProcessor();
-    return result;
-  }
-
-  @Override
-  public boolean removeQuery(QueryId queryId) {
-    return pool.remove(getQueryByQueryId(queryId));
-  }
-
-  public QuerySchedulingInfo getQueryByQueryId(QueryId queryId) {
-    for (QuerySchedulingInfo querySchedulingInfo : pool) {
-      if (querySchedulingInfo.getQueryId().equals(queryId)) {
-        return querySchedulingInfo;
-      }
-    }
-    return null;
-  }
-
-  @Override
-  public List<QueryInProgress> getRunningQueries() {
-    return new ArrayList<QueryInProgress>(manager.getRunningQueries());
-  }
-
-  public void start() {
-    queryProcessor.start();
-  }
-
-  public void stop() {
-    if (stopped.getAndSet(true)) {
-      return;
-    }
-    pool.clear();
-    synchronized (queryProcessor) {
-      queryProcessor.interrupt();
-    }
-  }
-
-  private QuerySchedulingInfo pollScheduledQuery() {
-    if (pool.size() > 1) {
-      Collections.sort(pool, COMPARATOR);
-    }
-    return pool.poll();
-  }
-
-  private void wakeupProcessor() {
-    synchronized (queryProcessor) {
-      queryProcessor.notifyAll();
-    }
-  }
-
-  private final class QueryProcessor implements Runnable {
-    @Override
-    public void run() {
-
-      QuerySchedulingInfo query;
-
-      while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
-        query = null;
-        if (getRunningQueries().size() == 0) {
-          query = pollScheduledQuery();
-        }
-
-        if (query != null) {
-          try {
-            manager.startQueryJob(query.getQueryId());
-          } catch (Throwable t) {
-            LOG.fatal("Exception during query startup:", t);
-            manager.stopQuery(query.getQueryId());
-          }
-        }
-
-        synchronized (queryProcessor) {
-          try {
-            queryProcessor.wait(500);
-          } catch (InterruptedException e) {
-            if (stopped.get()) {
-              break;
-            }
-            LOG.warn("Exception during shutdown: ", e);
-          }
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/session/InvalidSessionException.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/session/InvalidSessionException.java b/tajo-core/src/main/java/org/apache/tajo/session/InvalidSessionException.java
new file mode 100644
index 0000000..54c65bf
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/session/InvalidSessionException.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.session;
+
+public class InvalidSessionException extends Exception {
+  public InvalidSessionException(String sessionId) {
+    super("Invalid session id \"" + sessionId + "\"");
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/session/NoSuchSessionVariableException.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/session/NoSuchSessionVariableException.java b/tajo-core/src/main/java/org/apache/tajo/session/NoSuchSessionVariableException.java
new file mode 100644
index 0000000..be90449
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/session/NoSuchSessionVariableException.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.session;
+
+public class NoSuchSessionVariableException extends Exception {
+  public NoSuchSessionVariableException(String varname) {
+    super("No such session variable \"" + varname + "\"");
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/session/Session.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/session/Session.java b/tajo-core/src/main/java/org/apache/tajo/session/Session.java
new file mode 100644
index 0000000..7ac4f85
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/session/Session.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.session;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.SessionVars;
+import org.apache.tajo.master.exec.NonForwardQueryResultScanner;
+import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.common.ProtoObject;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.SessionProto;
+
+public class Session implements SessionConstants, ProtoObject<SessionProto>, Cloneable {
+  private static final Log LOG = LogFactory.getLog(Session.class);
+
+  private final String sessionId;
+  private final String userName;
+  private String currentDatabase;
+  private final Map<String, String> sessionVariables;
+  private final Map<QueryId, NonForwardQueryResultScanner> nonForwardQueryMap = new HashMap<QueryId, NonForwardQueryResultScanner>();
+
+  // transient status
+  private volatile long lastAccessTime;
+
+  public Session(String sessionId, String userName, String databaseName) {
+    this.sessionId = sessionId;
+    this.userName = userName;
+    this.currentDatabase = databaseName;
+    this.lastAccessTime = System.currentTimeMillis();
+
+    this.sessionVariables = new HashMap<String, String>();
+    sessionVariables.put(SessionVars.SESSION_ID.keyname(), sessionId);
+    sessionVariables.put(SessionVars.USERNAME.keyname(), userName);
+    selectDatabase(databaseName);
+  }
+
+  public Session(SessionProto proto) {
+    sessionId = proto.getSessionId();
+    userName = proto.getUsername();
+    currentDatabase = proto.getCurrentDatabase();
+    lastAccessTime = proto.getLastAccessTime();
+    KeyValueSet keyValueSet = new KeyValueSet(proto.getVariables());
+    sessionVariables = keyValueSet.getAllKeyValus();
+  }
+
+  public String getSessionId() {
+    return sessionId;
+  }
+
+  public String getUserName() {
+    return userName;
+  }
+
+  public void updateLastAccessTime() {
+    lastAccessTime = System.currentTimeMillis();
+  }
+
+  public long getLastAccessTime() {
+    return lastAccessTime;
+  }
+
+  public void setVariable(String name, String value) {
+    synchronized (sessionVariables) {
+      sessionVariables.put(SessionVars.handleDeprecatedName(name), value);
+    }
+  }
+
+  public String getVariable(String name) throws NoSuchSessionVariableException {
+    synchronized (sessionVariables) {
+      if (sessionVariables.containsKey(name)) {
+        return sessionVariables.get(SessionVars.handleDeprecatedName(name));
+      } else {
+        throw new NoSuchSessionVariableException(name);
+      }
+    }
+  }
+
+  public void removeVariable(String name) {
+    synchronized (sessionVariables) {
+      sessionVariables.remove(SessionVars.handleDeprecatedName(name));
+    }
+  }
+
+  public synchronized Map<String, String> getAllVariables() {
+    synchronized (sessionVariables) {
+      sessionVariables.put(SessionVars.SESSION_ID.keyname(), sessionId);
+      sessionVariables.put(SessionVars.USERNAME.keyname(), userName);
+      sessionVariables.put(SessionVars.SESSION_LAST_ACCESS_TIME.keyname(), String.valueOf(lastAccessTime));
+      sessionVariables.put(SessionVars.CURRENT_DATABASE.keyname(), currentDatabase);
+      return ImmutableMap.copyOf(sessionVariables);
+    }
+  }
+
+  public synchronized void selectDatabase(String databaseName) {
+    this.currentDatabase = databaseName;
+  }
+
+  public synchronized String getCurrentDatabase() {
+    return currentDatabase;
+  }
+
+  @Override
+  public SessionProto getProto() {
+    SessionProto.Builder builder = SessionProto.newBuilder();
+    builder.setSessionId(getSessionId());
+    builder.setUsername(getUserName());
+    builder.setCurrentDatabase(getCurrentDatabase());
+    builder.setLastAccessTime(lastAccessTime);
+    KeyValueSet variables = new KeyValueSet();
+
+    synchronized (sessionVariables) {
+      variables.putAll(this.sessionVariables);
+      builder.setVariables(variables.getProto());
+      return builder.build();
+    }
+  }
+
+  public String toString() {
+    return "user=" + getUserName() + ",id=" + getSessionId() +",last_atime=" + getLastAccessTime();
+  }
+
+  public Session clone() throws CloneNotSupportedException {
+    Session newSession = (Session) super.clone();
+    newSession.sessionVariables.putAll(getAllVariables());
+    return newSession;
+  }
+
+  public NonForwardQueryResultScanner getNonForwardQueryResultScanner(QueryId queryId) {
+    synchronized (nonForwardQueryMap) {
+      return nonForwardQueryMap.get(queryId);
+    }
+  }
+
+  public void addNonForwardQueryResultScanner(NonForwardQueryResultScanner resultScanner) {
+    synchronized (nonForwardQueryMap) {
+      nonForwardQueryMap.put(resultScanner.getQueryId(), resultScanner);
+    }
+  }
+
+  public void closeNonForwardQueryResultScanner(QueryId queryId) {
+    NonForwardQueryResultScanner resultScanner;
+    synchronized (nonForwardQueryMap) {
+      resultScanner = nonForwardQueryMap.remove(queryId);
+    }
+
+    if (resultScanner != null) {
+      try {
+        resultScanner.close();
+      } catch (Exception e) {
+        LOG.error("NonForwardQueryResultScanne close error: " + e.getMessage(), e);
+      }
+    }
+  }
+
+  public void close() {
+    try {
+      synchronized (nonForwardQueryMap) {
+        for (NonForwardQueryResultScanner eachQueryScanner: nonForwardQueryMap.values()) {
+          try {
+            eachQueryScanner.close();
+          } catch (Exception e) {
+            LOG.error("Error while closing NonForwardQueryResultScanner: " +
+                eachQueryScanner.getSessionId() + ", " + e.getMessage(), e);
+          }
+        }
+
+        nonForwardQueryMap.clear();
+      }
+    } catch (Throwable t) {
+      LOG.error(t.getMessage(), t);
+      throw new RuntimeException(t.getMessage(), t);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/session/SessionConstants.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/session/SessionConstants.java b/tajo-core/src/main/java/org/apache/tajo/session/SessionConstants.java
new file mode 100644
index 0000000..6c21a27
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/session/SessionConstants.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.session;
+
+public interface SessionConstants {
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/session/SessionEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/session/SessionEvent.java b/tajo-core/src/main/java/org/apache/tajo/session/SessionEvent.java
new file mode 100644
index 0000000..819fd16
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/session/SessionEvent.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.session;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class SessionEvent extends AbstractEvent<SessionEventType> {
+  private final String sessionId;
+
+  public SessionEvent(String sessionId, SessionEventType sessionEventType) {
+    super(sessionEventType);
+    this.sessionId = sessionId;
+  }
+
+  public String getSessionId() {
+    return sessionId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/session/SessionEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/session/SessionEventType.java b/tajo-core/src/main/java/org/apache/tajo/session/SessionEventType.java
new file mode 100644
index 0000000..8270926
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/session/SessionEventType.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.session;
+
+public enum SessionEventType {
+  EXPIRE,
+  PING
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/session/SessionLivelinessMonitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/session/SessionLivelinessMonitor.java b/tajo-core/src/main/java/org/apache/tajo/session/SessionLivelinessMonitor.java
new file mode 100644
index 0000000..2badccb
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/session/SessionLivelinessMonitor.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.session;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tajo.conf.TajoConf;
+
+public class SessionLivelinessMonitor extends AbstractLivelinessMonitor<String> {
+
+  private EventHandler dispatcher;
+
+  public SessionLivelinessMonitor(Dispatcher d) {
+    super(SessionLivelinessMonitor.class.getSimpleName(), new SystemClock());
+    this.dispatcher = d.getEventHandler();
+  }
+
+  public void serviceInit(Configuration conf) throws Exception {
+    Preconditions.checkArgument(conf instanceof TajoConf);
+    TajoConf systemConf = (TajoConf) conf;
+
+    // seconds
+    int expireIntvl = systemConf.getIntVar(TajoConf.ConfVars.$CLIENT_SESSION_EXPIRY_TIME);
+    setExpireInterval(expireIntvl);
+    setMonitorInterval(expireIntvl / 3);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void expire(String id) {
+    dispatcher.handle(new SessionEvent(id, SessionEventType.EXPIRE));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/session/SessionManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/session/SessionManager.java b/tajo-core/src/main/java/org/apache/tajo/session/SessionManager.java
new file mode 100644
index 0000000..571144b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/session/SessionManager.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.session;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SessionManager extends CompositeService implements EventHandler<SessionEvent> {
+  private static final Log LOG = LogFactory.getLog(SessionManager.class);
+
+  public final ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<String, Session>();
+  private final Dispatcher dispatcher;
+  private SessionLivelinessMonitor sessionLivelinessMonitor;
+
+
+  public SessionManager(Dispatcher dispatcher) {
+    super(SessionManager.class.getSimpleName());
+    this.dispatcher = dispatcher;
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    sessionLivelinessMonitor = new SessionLivelinessMonitor(dispatcher);
+    addIfService(sessionLivelinessMonitor);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    super.serviceStop();
+  }
+
+  private void assertSessionExistence(String sessionId) throws InvalidSessionException {
+    if (!sessions.containsKey(sessionId)) {
+      throw new InvalidSessionException(sessionId);
+    }
+  }
+
+  public String createSession(String username, String baseDatabaseName) throws InvalidSessionException {
+    String sessionId;
+    Session oldSession;
+
+    sessionId = UUID.randomUUID().toString();
+    Session newSession = new Session(sessionId, username, baseDatabaseName);
+    oldSession = sessions.putIfAbsent(sessionId, newSession);
+    if (oldSession != null) {
+      throw new InvalidSessionException("Session id is duplicated: " + oldSession.getSessionId());
+    }
+    LOG.info("Session " + sessionId + " is created." );
+    return sessionId;
+  }
+
+  public Session removeSession(String sessionId) {
+    if (sessions.containsKey(sessionId)) {
+      LOG.info("Session " + sessionId + " is removed.");
+      Session session = sessions.remove(sessionId);
+      session.close();
+      return session;
+    } else {
+      LOG.error("No such session id: " + sessionId);
+      return null;
+    }
+  }
+
+  public Session getSession(String sessionId) throws InvalidSessionException {
+    assertSessionExistence(sessionId);
+    touch(sessionId);
+    return sessions.get(sessionId);
+  }
+
+  public void setVariable(String sessionId, String name, String value) throws InvalidSessionException {
+    assertSessionExistence(sessionId);
+    touch(sessionId);
+    sessions.get(sessionId).setVariable(name, value);
+  }
+
+  public String getVariable(String sessionId, String name)
+      throws InvalidSessionException, NoSuchSessionVariableException {
+    assertSessionExistence(sessionId);
+    touch(sessionId);
+    return sessions.get(sessionId).getVariable(name);
+  }
+
+  public void removeVariable(String sessionId, String name) throws InvalidSessionException {
+    assertSessionExistence(sessionId);
+    touch(sessionId);
+    sessions.get(sessionId).removeVariable(name);
+  }
+
+  public Map<String, String> getAllVariables(String sessionId) throws InvalidSessionException {
+    assertSessionExistence(sessionId);
+    touch(sessionId);
+    return sessions.get(sessionId).getAllVariables();
+  }
+
+  public void touch(String sessionId) throws InvalidSessionException {
+    assertSessionExistence(sessionId);
+    sessions.get(sessionId).updateLastAccessTime();
+    sessionLivelinessMonitor.receivedPing(sessionId);
+  }
+
+  @Override
+  public void handle(SessionEvent event) {
+    LOG.info("Processing " + event.getSessionId() + " of type " + event.getType());
+
+    try {
+      assertSessionExistence(event.getSessionId());
+      touch(event.getSessionId());
+    } catch (InvalidSessionException e) {
+      LOG.error(e);
+    }
+
+    if (event.getType() == SessionEventType.EXPIRE) {
+      Session session = removeSession(event.getSessionId());
+      if (session != null) {
+        LOG.info("[Expired] Session username=" + session.getUserName() + ",sessionid=" + event.getSessionId());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
index 6050617..d711258 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -24,11 +24,11 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.master.TajoMaster.MasterContext;
-import org.apache.tajo.master.ha.HAService;
-import org.apache.tajo.master.querymaster.QueryInProgress;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
-import org.apache.tajo.master.querymaster.Task;
-import org.apache.tajo.master.querymaster.Stage;
+import org.apache.tajo.ha.HAService;
+import org.apache.tajo.querymaster.QueryInProgress;
+import org.apache.tajo.querymaster.QueryMasterTask;
+import org.apache.tajo.querymaster.Task;
+import org.apache.tajo.querymaster.Stage;
 import org.apache.tajo.util.history.TaskHistory;
 import org.apache.tajo.util.history.StageHistory;
 import org.apache.tajo.worker.TaskRunnerHistory;

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
index 932f584..c3f0087 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoWorkerProtocol.TaskHistoryProto;
-import org.apache.tajo.master.querymaster.QueryInfo;
+import org.apache.tajo.master.QueryInfo;
 
 import java.io.EOFException;
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
index 5934885..9eb58da 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.master.querymaster.QueryInfo;
+import org.apache.tajo.master.QueryInfo;
 import org.apache.tajo.worker.TaskHistory;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
index 208591f..89c3404 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
@@ -24,8 +24,8 @@ import com.google.common.collect.Lists;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.common.ProtoObject;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.querymaster.Task;
-import org.apache.tajo.master.querymaster.Repartitioner;
+import org.apache.tajo.querymaster.Task;
+import org.apache.tajo.querymaster.Repartitioner;
 import org.apache.tajo.util.TUtil;
 
 import java.net.URI;

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 8944eae..8241478 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -38,9 +38,9 @@ import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.master.event.ContainerAllocationEvent;
 import org.apache.tajo.master.event.ContainerAllocatorEventType;
 import org.apache.tajo.master.event.StageContainerAllocationEvent;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
-import org.apache.tajo.master.querymaster.Stage;
-import org.apache.tajo.master.querymaster.StageState;
+import org.apache.tajo.querymaster.QueryMasterTask;
+import org.apache.tajo.querymaster.Stage;
+import org.apache.tajo.querymaster.StageState;
 import org.apache.tajo.master.rm.*;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 4d96529..09a87e0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -38,9 +38,9 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.ha.TajoMasterInfo;
-import org.apache.tajo.master.querymaster.QueryMaster;
-import org.apache.tajo.master.querymaster.QueryMasterManagerService;
+import org.apache.tajo.ha.TajoMasterInfo;
+import org.apache.tajo.querymaster.QueryMaster;
+import org.apache.tajo.querymaster.QueryMasterManagerService;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.rpc.RpcChannelFactory;

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index 1c83110..2ae4bed 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -31,7 +31,7 @@ import org.apache.tajo.ipc.ClientProtos.GetQueryHistoryResponse;
 import org.apache.tajo.ipc.ClientProtos.QueryIdRequest;
 import org.apache.tajo.ipc.ClientProtos.ResultCode;
 import org.apache.tajo.ipc.QueryMasterClientProtocol;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.querymaster.QueryMasterTask;
 import org.apache.tajo.rpc.BlockingRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.util.NetUtils;

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/tajo-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/tajo-default.xml b/tajo-core/src/main/resources/tajo-default.xml
index db92b02..4a92e72 100644
--- a/tajo-core/src/main/resources/tajo-default.xml
+++ b/tajo-core/src/main/resources/tajo-default.xml
@@ -39,7 +39,7 @@
 
   <property>
     <name>tajo.querymaster.task-scheduler</name>
-    <value>org.apache.tajo.master.DefaultTaskScheduler</value>
+    <value>org.apache.tajo.querymaster.DefaultTaskScheduler</value>
   </property>
 
 </configuration>

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/admin/catalogview.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/catalogview.jsp b/tajo-core/src/main/resources/webapps/admin/catalogview.jsp
index 8f1d1bc..bc770d7 100644
--- a/tajo-core/src/main/resources/webapps/admin/catalogview.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/catalogview.jsp
@@ -24,7 +24,7 @@
 <%@ page import="org.apache.tajo.catalog.TableDesc" %>
 <%@ page import="org.apache.tajo.catalog.partition.PartitionMethodDesc" %>
 <%@ page import="org.apache.tajo.master.TajoMaster" %>
-<%@ page import="org.apache.tajo.master.ha.HAService" %>
+<%@ page import="org.apache.tajo.ha.HAService" %>
 <%@ page import="org.apache.tajo.util.FileUtil" %>
 <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
 <%@ page import="java.util.Collection" %>

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/admin/cluster.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/cluster.jsp b/tajo-core/src/main/resources/webapps/admin/cluster.jsp
index 6fe21a2..1fb5e40 100644
--- a/tajo-core/src/main/resources/webapps/admin/cluster.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/cluster.jsp
@@ -21,8 +21,8 @@
 
 <%@ page import="org.apache.tajo.master.TajoMaster" %>
 <%@ page import="org.apache.tajo.master.cluster.WorkerConnectionInfo" %>
-<%@ page import="org.apache.tajo.master.ha.HAService" %>
-<%@ page import="org.apache.tajo.master.ha.TajoMasterInfo" %>
+<%@ page import="org.apache.tajo.ha.HAService" %>
+<%@ page import="org.apache.tajo.ha.TajoMasterInfo" %>
 <%@ page import="org.apache.tajo.master.rm.Worker" %>
 <%@ page import="org.apache.tajo.master.rm.WorkerResource" %>
 <%@ page import="org.apache.tajo.master.rm.WorkerState" %>

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp
index 6778725..00186d7 100644
--- a/tajo-core/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/index.jsp
@@ -23,9 +23,9 @@
 <%@ page import="org.apache.tajo.conf.TajoConf" %>
 <%@ page import="org.apache.tajo.ipc.TajoMasterProtocol" %>
 <%@ page import="org.apache.tajo.master.TajoMaster" %>
-<%@ page import="org.apache.tajo.master.ha.HAService" %>
-<%@ page import="org.apache.tajo.master.ha.TajoMasterInfo" %>
-<%@ page import="org.apache.tajo.master.querymaster.QueryInProgress" %>
+<%@ page import="org.apache.tajo.ha.HAService" %>
+<%@ page import="org.apache.tajo.ha.TajoMasterInfo" %>
+<%@ page import="org.apache.tajo.querymaster.QueryInProgress" %>
 <%@ page import="org.apache.tajo.master.rm.Worker" %>
 <%@ page import="org.apache.tajo.master.rm.WorkerState" %>
 <%@ page import="org.apache.tajo.util.NetUtils" %>

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp
index 5afb3b2..4d8e5e6 100644
--- a/tajo-core/src/main/resources/webapps/admin/query.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/query.jsp
@@ -20,7 +20,7 @@
 <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
 
 <%@ page import="org.apache.tajo.master.TajoMaster" %>
-<%@ page import="org.apache.tajo.master.querymaster.QueryInProgress" %>
+<%@ page import="org.apache.tajo.querymaster.QueryInProgress" %>
 <%@ page import="org.apache.tajo.master.rm.Worker" %>
 <%@ page import="org.apache.tajo.util.JSPUtil" %>
 <%@ page import="org.apache.tajo.util.StringUtils" %>
@@ -28,7 +28,7 @@
 <%@ page import="java.text.SimpleDateFormat" %>
 <%@ page import="java.util.*" %>
 <%@ page import="org.apache.tajo.util.history.HistoryReader" %>
-<%@ page import="org.apache.tajo.master.querymaster.QueryInfo" %>
+<%@ page import="org.apache.tajo.master.QueryInfo" %>
 
 <%
   TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/admin/query_executor.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/query_executor.jsp b/tajo-core/src/main/resources/webapps/admin/query_executor.jsp
index 9ff6625..82836ac 100644
--- a/tajo-core/src/main/resources/webapps/admin/query_executor.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/query_executor.jsp
@@ -19,7 +19,7 @@
 %>
 <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
 <%@ page import="org.apache.tajo.master.TajoMaster" %>
-<%@ page import="org.apache.tajo.master.ha.HAService" %>
+<%@ page import="org.apache.tajo.ha.HAService" %>
 <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
 
 <%

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/worker/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/index.jsp b/tajo-core/src/main/resources/webapps/worker/index.jsp
index 866d663..bb72f9e 100644
--- a/tajo-core/src/main/resources/webapps/worker/index.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/index.jsp
@@ -19,8 +19,8 @@
 %>
 <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
 
-<%@ page import="org.apache.tajo.master.querymaster.Query" %>
-<%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %>
+<%@ page import="org.apache.tajo.querymaster.Query" %>
+<%@ page import="org.apache.tajo.querymaster.QueryMasterTask" %>
 <%@ page import="org.apache.tajo.util.JSPUtil" %>
 <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
 <%@ page import="org.apache.tajo.worker.TajoWorker" %>

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/worker/querydetail.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/querydetail.jsp b/tajo-core/src/main/resources/webapps/worker/querydetail.jsp
index 340eb95..56bdeba 100644
--- a/tajo-core/src/main/resources/webapps/worker/querydetail.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/querydetail.jsp
@@ -20,8 +20,8 @@
 <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
 
 <%@ page import="org.apache.tajo.QueryId" %>
-<%@ page import="org.apache.tajo.master.querymaster.Query" %>
-<%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %>
+<%@ page import="org.apache.tajo.querymaster.Query" %>
+<%@ page import="org.apache.tajo.querymaster.QueryMasterTask" %>
 <%@ page import="org.apache.tajo.util.JSPUtil" %>
 <%@ page import="org.apache.tajo.util.TajoIdUtils" %>
 <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/worker/queryplan.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/queryplan.jsp b/tajo-core/src/main/resources/webapps/worker/queryplan.jsp
index 88de97d..878efe3 100644
--- a/tajo-core/src/main/resources/webapps/worker/queryplan.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/queryplan.jsp
@@ -21,11 +21,11 @@
 
 <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
 <%@ page import="org.apache.tajo.worker.*" %>
-<%@ page import="org.apache.tajo.master.querymaster.Query" %>
+<%@ page import="org.apache.tajo.querymaster.Query" %>
 <%@ page import="org.apache.tajo.QueryId" %>
 <%@ page import="org.apache.tajo.util.TajoIdUtils" %>
-<%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %>
-<%@ page import="org.apache.tajo.master.querymaster.Stage" %>
+<%@ page import="org.apache.tajo.querymaster.QueryMasterTask" %>
+<%@ page import="org.apache.tajo.querymaster.Stage" %>
 <%@ page import="org.apache.tajo.engine.planner.global.ExecutionBlock" %>
 <%@ page import="java.util.*" %>
 <%@ page import="org.apache.tajo.ExecutionBlockId" %>

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
index 3aef49d..6d0e3a2 100644
--- a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
@@ -25,7 +25,7 @@
 <%@ page import="org.apache.tajo.catalog.statistics.TableStats" %>
 <%@ page import="org.apache.tajo.plan.util.PlannerUtil" %>
 <%@ page import="org.apache.tajo.ipc.TajoMasterProtocol" %>
-<%@ page import="org.apache.tajo.master.querymaster.*" %>
+<%@ page import="org.apache.tajo.querymaster.*" %>
 <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
 <%@ page import="org.apache.tajo.worker.TajoWorker" %>
 <%@ page import="java.text.NumberFormat" %>

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/worker/task.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/task.jsp b/tajo-core/src/main/resources/webapps/worker/task.jsp
index 81b1e6d..17e884a 100644
--- a/tajo-core/src/main/resources/webapps/worker/task.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/task.jsp
@@ -25,10 +25,10 @@
 <%@ page import="org.apache.tajo.catalog.proto.CatalogProtos" %>
 <%@ page import="org.apache.tajo.catalog.statistics.TableStats" %>
 <%@ page import="org.apache.tajo.ipc.TajoWorkerProtocol" %>
-<%@ page import="org.apache.tajo.master.querymaster.Query" %>
-<%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %>
-<%@ page import="org.apache.tajo.master.querymaster.Task" %>
-<%@ page import="org.apache.tajo.master.querymaster.Stage" %>
+<%@ page import="org.apache.tajo.querymaster.Query" %>
+<%@ page import="org.apache.tajo.querymaster.QueryMasterTask" %>
+<%@ page import="org.apache.tajo.querymaster.Task" %>
+<%@ page import="org.apache.tajo.querymaster.Stage" %>
 <%@ page import="org.apache.tajo.storage.DataLocation" %>
 <%@ page import="org.apache.tajo.storage.fragment.FileFragment" %>
 <%@ page import="org.apache.tajo.storage.fragment.FragmentConvertor" %>

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
index 8bee6fb..e464446 100644
--- a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
+++ b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
@@ -35,7 +35,7 @@ import org.apache.tajo.client.TajoClientImpl;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.master.session.Session;
+import org.apache.tajo.session.Session;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.FileUtil;
 import org.apache.tajo.util.KeyValueSet;

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 0d2f6fa..0786912 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -44,10 +44,10 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanTestRuleProvider;
 import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.master.querymaster.*;
-import org.apache.tajo.master.querymaster.Query;
-import org.apache.tajo.master.querymaster.Stage;
-import org.apache.tajo.master.querymaster.StageState;
+import org.apache.tajo.querymaster.*;
+import org.apache.tajo.querymaster.Query;
+import org.apache.tajo.querymaster.Stage;
+import org.apache.tajo.querymaster.StageState;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.plan.rewrite.LogicalPlanTestRuleProvider;
 import org.apache.tajo.util.CommonTestingUtil;

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index 889d61c..0b59bc7 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@ -37,7 +37,7 @@ import org.apache.tajo.engine.function.builtin.SumInt;
 import org.apache.tajo.engine.json.CoreGsonHelper;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.master.session.Session;
+import org.apache.tajo.session.Session;
 import org.apache.tajo.plan.*;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.logical.*;

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 6db76ae..d3ab1fd 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -44,7 +44,7 @@ import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.master.session.Session;
+import org.apache.tajo.session.Session;
 import org.apache.tajo.plan.LogicalOptimizer;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.LogicalPlanner;

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
index 1a212b0..d1756e1 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
@@ -24,10 +24,10 @@ import org.apache.tajo.*;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.master.querymaster.Query;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
-import org.apache.tajo.master.querymaster.Task;
-import org.apache.tajo.master.querymaster.Stage;
+import org.apache.tajo.querymaster.Query;
+import org.apache.tajo.querymaster.QueryMasterTask;
+import org.apache.tajo.querymaster.Task;
+import org.apache.tajo.querymaster.Stage;
 import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.TUtil;

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
index 68b3fb3..39b58d0 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
@@ -32,7 +32,7 @@ import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.jdbc.TajoResultSet;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.querymaster.QueryMasterTask;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.FileUtil;
 import org.apache.tajo.util.KeyValueSet;

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
index 3400752..cacef96 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
@@ -38,7 +38,7 @@ import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.jdbc.TajoResultSet;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.querymaster.QueryMasterTask;
 import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.util.CommonTestingUtil;