You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/01/08 16:36:10 UTC
[02/13] tajo git commit: TAJO-1288: Refactoring
org.apache.tajo.master package.
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
new file mode 100644
index 0000000..86c49b4
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
@@ -0,0 +1,443 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.querymaster;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
+import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
+import org.apache.tajo.querymaster.Task.IntermediateEntry;
+import org.apache.tajo.querymaster.Task.PullHost;
+import org.apache.tajo.master.container.TajoContainerId;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput;
+
+public class TaskAttempt implements EventHandler<TaskAttemptEvent> {
+
+ private static final Log LOG = LogFactory.getLog(TaskAttempt.class);
+
+ private final static int EXPIRE_TIME = 15000;
+
+ private final TaskAttemptId id;
+ private final Task task;
+ final EventHandler eventHandler;
+
+ private TajoContainerId containerId;
+ private WorkerConnectionInfo workerConnectionInfo;
+ private int expire;
+
+ private final Lock readLock;
+ private final Lock writeLock;
+
+ private final List<String> diagnostics = new ArrayList<String>();
+
+ private final TaskAttemptScheduleContext scheduleContext;
+
+ private float progress;
+ private CatalogProtos.TableStatsProto inputStats;
+ private CatalogProtos.TableStatsProto resultStats;
+
+ protected static final StateMachineFactory
+ <TaskAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+ stateMachineFactory = new StateMachineFactory
+ <TaskAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+ (TaskAttemptState.TA_NEW)
+
+ // Transitions from TA_NEW state
+ .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
+ TaskAttemptEventType.TA_SCHEDULE, new TaskAttemptScheduleTransition())
+ .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
+ TaskAttemptEventType.TA_RESCHEDULE, new TaskAttemptScheduleTransition())
+ .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_KILLED,
+ TaskAttemptEventType.TA_KILL,
+ new TaskKilledCompleteTransition())
+
+ // Transitions from TA_UNASSIGNED state
+ .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_ASSIGNED,
+ TaskAttemptEventType.TA_ASSIGNED,
+ new LaunchTransition())
+ .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_KILL_WAIT,
+ TaskAttemptEventType.TA_KILL,
+ new KillUnassignedTaskTransition())
+
+ // Transitions from TA_ASSIGNED state
+ .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_ASSIGNED,
+ TaskAttemptEventType.TA_ASSIGNED, new AlreadyAssignedTransition())
+ .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILL_WAIT,
+ TaskAttemptEventType.TA_KILL,
+ new KillTaskTransition())
+ .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILLED,
+ TaskAttemptEventType.TA_KILL,
+ new KillTaskTransition())
+ .addTransition(TaskAttemptState.TA_ASSIGNED,
+ EnumSet.of(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILLED),
+ TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
+ .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_SUCCEEDED,
+ TaskAttemptEventType.TA_DONE, new SucceededTransition())
+ .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_FAILED,
+ TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
+
+ // Transitions from TA_RUNNING state
+ .addTransition(TaskAttemptState.TA_RUNNING,
+ EnumSet.of(TaskAttemptState.TA_RUNNING),
+ TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
+ .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILL_WAIT,
+ TaskAttemptEventType.TA_KILL,
+ new KillTaskTransition())
+ .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_SUCCEEDED,
+ TaskAttemptEventType.TA_DONE, new SucceededTransition())
+ .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_FAILED,
+ TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
+
+ .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
+ TaskAttemptEventType.TA_LOCAL_KILLED,
+ new TaskKilledCompleteTransition())
+ .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT,
+ TaskAttemptEventType.TA_ASSIGNED,
+ new KillTaskTransition())
+ .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
+ TaskAttemptEventType.TA_SCHEDULE_CANCELED,
+ new TaskKilledCompleteTransition())
+ .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
+ TaskAttemptEventType.TA_DONE,
+ new TaskKilledCompleteTransition())
+ .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_FAILED,
+ TaskAttemptEventType.TA_FATAL_ERROR)
+ .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT,
+ EnumSet.of(
+ TaskAttemptEventType.TA_KILL,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+ TaskAttemptEventType.TA_UPDATE))
+
+ // Transitions from TA_SUCCEEDED state
+ .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
+ TaskAttemptEventType.TA_UPDATE)
+ .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
+ TaskAttemptEventType.TA_DONE, new AlreadyDoneTransition())
+ .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_FAILED,
+ TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
+ // Ignore-able transitions
+ .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
+ TaskAttemptEventType.TA_KILL)
+
+ // Transitions from TA_KILLED state
+ .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE)
+ // Ignore-able transitions
+ .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
+ EnumSet.of(
+ TaskAttemptEventType.TA_UPDATE))
+ .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
+ EnumSet.of(
+ TaskAttemptEventType.TA_LOCAL_KILLED,
+ TaskAttemptEventType.TA_KILL,
+ TaskAttemptEventType.TA_ASSIGNED,
+ TaskAttemptEventType.TA_DONE),
+ new TaskKilledCompleteTransition())
+ .installTopology();
+
+ private final StateMachine<TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+ stateMachine;
+
+
+ public TaskAttempt(final TaskAttemptScheduleContext scheduleContext,
+ final TaskAttemptId id, final Task task,
+ final EventHandler eventHandler) {
+ this.scheduleContext = scheduleContext;
+ this.id = id;
+ this.expire = TaskAttempt.EXPIRE_TIME;
+ this.task = task;
+ this.eventHandler = eventHandler;
+
+ ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ this.readLock = readWriteLock.readLock();
+ this.writeLock = readWriteLock.writeLock();
+
+ stateMachine = stateMachineFactory.make(this);
+ }
+
+ public TaskAttemptState getState() {
+ readLock.lock();
+ try {
+ return stateMachine.getCurrentState();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public TaskAttemptId getId() {
+ return this.id;
+ }
+
+ public boolean isLeafTask() {
+ return this.task.isLeafTask();
+ }
+
+ public Task getTask() {
+ return this.task;
+ }
+
+ public WorkerConnectionInfo getWorkerConnectionInfo() {
+ return this.workerConnectionInfo;
+ }
+
+ public void setContainerId(TajoContainerId containerId) {
+ this.containerId = containerId;
+ }
+
+ public synchronized void setExpireTime(int expire) {
+ this.expire = expire;
+ }
+
+ public synchronized void updateExpireTime(int period) {
+ this.setExpireTime(this.expire - period);
+ }
+
+ public synchronized void resetExpireTime() {
+ this.setExpireTime(TaskAttempt.EXPIRE_TIME);
+ }
+
+ public int getLeftTime() {
+ return this.expire;
+ }
+
+ public float getProgress() {
+ return progress;
+ }
+
+ public TableStats getInputStats() {
+ if (inputStats == null) {
+ return null;
+ }
+
+ return new TableStats(inputStats);
+ }
+
+ public TableStats getResultStats() {
+ if (resultStats == null) {
+ return null;
+ }
+ return new TableStats(resultStats);
+ }
+
+ private void fillTaskStatistics(TaskCompletionReport report) {
+ this.progress = 1.0f;
+
+ List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
+
+ if (report.getShuffleFileOutputsCount() > 0) {
+ this.getTask().setShuffleFileOutputs(report.getShuffleFileOutputsList());
+
+ PullHost host = new PullHost(getWorkerConnectionInfo().getHost(), getWorkerConnectionInfo().getPullServerPort());
+ for (ShuffleFileOutput p : report.getShuffleFileOutputsList()) {
+ IntermediateEntry entry = new IntermediateEntry(getId().getTaskId().getId(),
+ getId().getId(), p.getPartId(), host, p.getVolume());
+ partitions.add(entry);
+ }
+ }
+ this.getTask().setIntermediateData(partitions);
+
+ if (report.hasInputStats()) {
+ this.inputStats = report.getInputStats();
+ }
+ if (report.hasResultStats()) {
+ this.resultStats = report.getResultStats();
+ this.getTask().setStats(new TableStats(resultStats));
+ }
+ }
+
+ private static class TaskAttemptScheduleTransition implements
+ SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
+
+ @Override
+ public void transition(TaskAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) {
+ taskAttempt.eventHandler.handle(new TaskAttemptToSchedulerEvent(
+ EventType.T_SCHEDULE, taskAttempt.getTask().getId().getExecutionBlockId(),
+ taskAttempt.scheduleContext, taskAttempt));
+ }
+ }
+
+ private static class KillUnassignedTaskTransition implements
+ SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
+
+ @Override
+ public void transition(TaskAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) {
+ taskAttempt.eventHandler.handle(new TaskAttemptToSchedulerEvent(
+ EventType.T_SCHEDULE_CANCEL, taskAttempt.getTask().getId().getExecutionBlockId(),
+ taskAttempt.scheduleContext, taskAttempt));
+ }
+ }
+
+ private static class LaunchTransition
+ implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
+
+ @Override
+ public void transition(TaskAttempt taskAttempt,
+ TaskAttemptEvent event) {
+ TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event;
+ taskAttempt.containerId = castEvent.getContainerId();
+ taskAttempt.workerConnectionInfo = castEvent.getWorkerConnectionInfo();
+ taskAttempt.eventHandler.handle(
+ new TaskTAttemptEvent(taskAttempt.getId(),
+ TaskEventType.T_ATTEMPT_LAUNCHED));
+ }
+ }
+
+ private static class TaskKilledCompleteTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
+
+ @Override
+ public void transition(TaskAttempt taskAttempt,
+ TaskAttemptEvent event) {
+ taskAttempt.getTask().handle(new TaskEvent(taskAttempt.getId().getTaskId(),
+ TaskEventType.T_ATTEMPT_KILLED));
+ LOG.info(taskAttempt.getId() + " Received TA_KILLED Status from LocalTask");
+ }
+ }
+
+ private static class StatusUpdateTransition
+ implements MultipleArcTransition<TaskAttempt, TaskAttemptEvent, TaskAttemptState> {
+
+ @Override
+ public TaskAttemptState transition(TaskAttempt taskAttempt,
+ TaskAttemptEvent event) {
+ TaskAttemptStatusUpdateEvent updateEvent = (TaskAttemptStatusUpdateEvent) event;
+
+ taskAttempt.progress = updateEvent.getStatus().getProgress();
+ taskAttempt.inputStats = updateEvent.getStatus().getInputStats();
+ taskAttempt.resultStats = updateEvent.getStatus().getResultStats();
+
+ return TaskAttemptState.TA_RUNNING;
+ }
+ }
+
+ private void addDiagnosticInfo(String diag) {
+ if (diag != null && !diag.equals("")) {
+ diagnostics.add(diag);
+ }
+ }
+
+ private static class AlreadyAssignedTransition
+ implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{
+
+ @Override
+ public void transition(TaskAttempt taskAttempt,
+ TaskAttemptEvent taskAttemptEvent) {
+ }
+ }
+
+ private static class AlreadyDoneTransition
+ implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{
+
+ @Override
+ public void transition(TaskAttempt taskAttempt,
+ TaskAttemptEvent taskAttemptEvent) {
+ }
+ }
+
+ private static class SucceededTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
+ @Override
+ public void transition(TaskAttempt taskAttempt,
+ TaskAttemptEvent event) {
+ TaskCompletionReport report = ((TaskCompletionEvent)event).getReport();
+
+ try {
+ taskAttempt.fillTaskStatistics(report);
+ taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_SUCCEEDED));
+ } catch (Throwable t) {
+ taskAttempt.eventHandler.handle(new TaskFatalErrorEvent(taskAttempt.getId(), t.getMessage()));
+ taskAttempt.addDiagnosticInfo(ExceptionUtils.getStackTrace(t));
+ }
+ }
+ }
+
+ private static class KillTaskTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
+
+ @Override
+ public void transition(TaskAttempt taskAttempt, TaskAttemptEvent event) {
+ taskAttempt.eventHandler.handle(new LocalTaskEvent(taskAttempt.getId(), taskAttempt.containerId,
+ LocalTaskEventType.KILL));
+ }
+ }
+
+ private static class FailedTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{
+ @Override
+ public void transition(TaskAttempt taskAttempt, TaskAttemptEvent event) {
+ TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event;
+ taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_FAILED));
+ taskAttempt.addDiagnosticInfo(errorEvent.errorMessage());
+ LOG.error(taskAttempt.getId() + " FROM " + taskAttempt.getWorkerConnectionInfo().getHost()
+ + " >> " + errorEvent.errorMessage());
+ }
+ }
+
+ @Override
+ public void handle(TaskAttemptEvent event) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing " + event.getTaskAttemptId() + " of type " + event.getType());
+ }
+ try {
+ writeLock.lock();
+ TaskAttemptState oldState = getState();
+ try {
+ stateMachine.doTransition(event.getType(), event);
+ } catch (InvalidStateTransitonException e) {
+ LOG.error("Can't handle this event at current state of " + event.getTaskAttemptId() + ")"
+ + ", eventType:" + event.getType().name()
+ + ", oldState:" + oldState.name()
+ + ", nextState:" + getState().name()
+ , e);
+ eventHandler.handle(
+ new StageDiagnosticsUpdateEvent(event.getTaskAttemptId().getTaskId().getExecutionBlockId(),
+ "Can't handle this event at current state of " + event.getTaskAttemptId() + ")"));
+ eventHandler.handle(
+ new StageEvent(event.getTaskAttemptId().getTaskId().getExecutionBlockId(),
+ StageEventType.SQ_INTERNAL_ERROR));
+ }
+
+ //notify the eventhandler of state change
+ if (LOG.isDebugEnabled()) {
+ if (oldState != getState()) {
+ LOG.debug(id + " TaskAttempt Transitioned from " + oldState + " to "
+ + getState());
+ }
+ }
+ }
+
+ finally {
+ writeLock.unlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskSchedulerContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskSchedulerContext.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskSchedulerContext.java
new file mode 100644
index 0000000..b699674
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskSchedulerContext.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.querymaster;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.querymaster.QueryMasterTask;
+
+public class TaskSchedulerContext {
+ private QueryMasterTask.QueryMasterTaskContext masterContext;
+ private boolean isLeafQuery;
+ private ExecutionBlockId blockId;
+ private int taskSize;
+ private int estimatedTaskNum;
+
+ public TaskSchedulerContext(QueryMasterTask.QueryMasterTaskContext masterContext, boolean isLeafQuery,
+ ExecutionBlockId blockId) {
+ this.masterContext = masterContext;
+ this.isLeafQuery = isLeafQuery;
+ this.blockId = blockId;
+ }
+
+ public QueryMasterTask.QueryMasterTaskContext getMasterContext() {
+ return masterContext;
+ }
+
+ public boolean isLeafQuery() {
+ return isLeafQuery;
+ }
+
+ public ExecutionBlockId getBlockId() {
+ return blockId;
+ }
+
+ public int getTaskSize() {
+ return taskSize;
+ }
+
+ public int getEstimatedTaskNum() {
+ return estimatedTaskNum;
+ }
+
+ public void setTaskSize(int taskSize) {
+ this.taskSize = taskSize;
+ }
+
+ public void setEstimatedTaskNum(int estimatedTaskNum) {
+ this.estimatedTaskNum = estimatedTaskNum;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskSchedulerFactory.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskSchedulerFactory.java
new file mode 100644
index 0000000..2794771
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskSchedulerFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.querymaster;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+public class TaskSchedulerFactory {
+ private static Class<? extends AbstractTaskScheduler> CACHED_ALGORITHM_CLASS;
+ private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
+ private static final Class<?>[] DEFAULT_PARAMS = { TaskSchedulerContext.class, Stage.class };
+
+ public static Class<? extends AbstractTaskScheduler> getTaskSchedulerClass(Configuration conf)
+ throws IOException {
+ if (CACHED_ALGORITHM_CLASS != null) {
+ return CACHED_ALGORITHM_CLASS;
+ } else {
+ CACHED_ALGORITHM_CLASS = conf.getClass("tajo.querymaster.task-scheduler", null, AbstractTaskScheduler.class);
+ }
+
+ if (CACHED_ALGORITHM_CLASS == null) {
+ throw new IOException("Task scheduler is null");
+ }
+ return CACHED_ALGORITHM_CLASS;
+ }
+
+ public static <T extends AbstractTaskScheduler> T get(Class<T> clazz, TaskSchedulerContext context,
+ Stage stage) {
+ T result;
+ try {
+ Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz);
+ if (constructor == null) {
+ constructor = clazz.getDeclaredConstructor(DEFAULT_PARAMS);
+ constructor.setAccessible(true);
+ CONSTRUCTOR_CACHE.put(clazz, constructor);
+ }
+ result = constructor.newInstance(new Object[]{context, stage});
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return result;
+ }
+
+ public static AbstractTaskScheduler get(Configuration conf, TaskSchedulerContext context, Stage stage)
+ throws IOException {
+ return get(getTaskSchedulerClass(conf), context, stage);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/scheduler/QuerySchedulingInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/scheduler/QuerySchedulingInfo.java b/tajo-core/src/main/java/org/apache/tajo/scheduler/QuerySchedulingInfo.java
deleted file mode 100644
index d9932bd..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/scheduler/QuerySchedulingInfo.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.scheduler;
-
-import com.google.common.base.Objects;
-import org.apache.tajo.QueryId;
-
-public class QuerySchedulingInfo {
- private QueryId queryId;
- private Integer priority;
- private Long startTime;
-
- public QuerySchedulingInfo(QueryId queryId, Integer priority, Long startTime) {
- this.queryId = queryId;
- this.priority = priority;
- this.startTime = startTime;
- }
-
- public QueryId getQueryId() {
- return queryId;
- }
-
- public Integer getPriority() {
- return priority;
- }
-
- public Long getStartTime() {
- return startTime;
- }
-
- public String getName() {
- return queryId.getId();
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(startTime, getName(), priority);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/scheduler/Scheduler.java b/tajo-core/src/main/java/org/apache/tajo/scheduler/Scheduler.java
deleted file mode 100644
index d74280c..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/scheduler/Scheduler.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.scheduler;
-
-import org.apache.tajo.QueryId;
-import org.apache.tajo.master.querymaster.QueryInProgress;
-
-import java.util.List;
-
-public interface Scheduler {
-
- public Mode getMode();
-
- public String getName();
-
- public boolean addQuery(QueryInProgress resource);
-
- public boolean removeQuery(QueryId queryId);
-
- public List<QueryInProgress> getRunningQueries();
-
- public enum Mode {
- FIFO
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/scheduler/SchedulingAlgorithms.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/scheduler/SchedulingAlgorithms.java b/tajo-core/src/main/java/org/apache/tajo/scheduler/SchedulingAlgorithms.java
deleted file mode 100644
index 9c9b16d..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/scheduler/SchedulingAlgorithms.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.scheduler;
-
-import java.util.Comparator;
-
-/**
- * Utility class containing scheduling algorithms used in the scheduler.
- */
-
-public class SchedulingAlgorithms {
- /**
- * Compare Schedulables in order of priority and then submission time, as in
- * the default FIFO scheduler in Tajo.
- */
- public static class FifoComparator implements Comparator<QuerySchedulingInfo> {
- @Override
- public int compare(QuerySchedulingInfo q1, QuerySchedulingInfo q2) {
- int res = q1.getPriority().compareTo(q2.getPriority());
- if (res == 0) {
- res = (int) Math.signum(q1.getStartTime() - q2.getStartTime());
- }
- if (res == 0) {
- // In the rare case where jobs were submitted at the exact same time,
- // compare them by name (which will be the QueryId) to get a deterministic ordering
- res = q1.getName().compareTo(q2.getName());
- }
- return res;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java b/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java
deleted file mode 100644
index a74e606..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.scheduler;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.master.querymaster.QueryInProgress;
-import org.apache.tajo.master.querymaster.QueryJobManager;
-
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class SimpleFifoScheduler implements Scheduler {
- private static final Log LOG = LogFactory.getLog(SimpleFifoScheduler.class.getName());
- private LinkedList<QuerySchedulingInfo> pool = new LinkedList<QuerySchedulingInfo>();
- private final Thread queryProcessor;
- private AtomicBoolean stopped = new AtomicBoolean();
- private QueryJobManager manager;
- private Comparator<QuerySchedulingInfo> COMPARATOR = new SchedulingAlgorithms.FifoComparator();
-
- public SimpleFifoScheduler(QueryJobManager manager) {
- this.manager = manager;
- this.queryProcessor = new Thread(new QueryProcessor());
- this.queryProcessor.setName("Query Processor");
- }
-
- @Override
- public Mode getMode() {
- return Mode.FIFO;
- }
-
- @Override
- public String getName() {
- return manager.getName();
- }
-
- @Override
- public boolean addQuery(QueryInProgress queryInProgress) {
- int qSize = pool.size();
- if (qSize != 0 && qSize % 100 == 0) {
- LOG.info("Size of Fifo queue is " + qSize);
- }
-
- QuerySchedulingInfo querySchedulingInfo = new QuerySchedulingInfo(queryInProgress.getQueryId(), 1, queryInProgress.getStartTime());
- boolean result = pool.add(querySchedulingInfo);
- if (getRunningQueries().size() == 0) wakeupProcessor();
- return result;
- }
-
- @Override
- public boolean removeQuery(QueryId queryId) {
- return pool.remove(getQueryByQueryId(queryId));
- }
-
- public QuerySchedulingInfo getQueryByQueryId(QueryId queryId) {
- for (QuerySchedulingInfo querySchedulingInfo : pool) {
- if (querySchedulingInfo.getQueryId().equals(queryId)) {
- return querySchedulingInfo;
- }
- }
- return null;
- }
-
- @Override
- public List<QueryInProgress> getRunningQueries() {
- return new ArrayList<QueryInProgress>(manager.getRunningQueries());
- }
-
- public void start() {
- queryProcessor.start();
- }
-
- public void stop() {
- if (stopped.getAndSet(true)) {
- return;
- }
- pool.clear();
- synchronized (queryProcessor) {
- queryProcessor.interrupt();
- }
- }
-
- private QuerySchedulingInfo pollScheduledQuery() {
- if (pool.size() > 1) {
- Collections.sort(pool, COMPARATOR);
- }
- return pool.poll();
- }
-
- private void wakeupProcessor() {
- synchronized (queryProcessor) {
- queryProcessor.notifyAll();
- }
- }
-
- private final class QueryProcessor implements Runnable {
- @Override
- public void run() {
-
- QuerySchedulingInfo query;
-
- while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
- query = null;
- if (getRunningQueries().size() == 0) {
- query = pollScheduledQuery();
- }
-
- if (query != null) {
- try {
- manager.startQueryJob(query.getQueryId());
- } catch (Throwable t) {
- LOG.fatal("Exception during query startup:", t);
- manager.stopQuery(query.getQueryId());
- }
- }
-
- synchronized (queryProcessor) {
- try {
- queryProcessor.wait(500);
- } catch (InterruptedException e) {
- if (stopped.get()) {
- break;
- }
- LOG.warn("Exception during shutdown: ", e);
- }
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/session/InvalidSessionException.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/session/InvalidSessionException.java b/tajo-core/src/main/java/org/apache/tajo/session/InvalidSessionException.java
new file mode 100644
index 0000000..54c65bf
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/session/InvalidSessionException.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.session;
+
+public class InvalidSessionException extends Exception {
+ public InvalidSessionException(String sessionId) {
+ super("Invalid session id \"" + sessionId + "\"");
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/session/NoSuchSessionVariableException.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/session/NoSuchSessionVariableException.java b/tajo-core/src/main/java/org/apache/tajo/session/NoSuchSessionVariableException.java
new file mode 100644
index 0000000..be90449
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/session/NoSuchSessionVariableException.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.session;
+
+public class NoSuchSessionVariableException extends Exception {
+ public NoSuchSessionVariableException(String varname) {
+ super("No such session variable \"" + varname + "\"");
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/session/Session.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/session/Session.java b/tajo-core/src/main/java/org/apache/tajo/session/Session.java
new file mode 100644
index 0000000..7ac4f85
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/session/Session.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.session;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.SessionVars;
+import org.apache.tajo.master.exec.NonForwardQueryResultScanner;
+import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.common.ProtoObject;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.SessionProto;
+
+public class Session implements SessionConstants, ProtoObject<SessionProto>, Cloneable {
+ private static final Log LOG = LogFactory.getLog(Session.class);
+
+ private final String sessionId;
+ private final String userName;
+ private String currentDatabase;
+ private final Map<String, String> sessionVariables;
+ private final Map<QueryId, NonForwardQueryResultScanner> nonForwardQueryMap = new HashMap<QueryId, NonForwardQueryResultScanner>();
+
+ // transient status
+ private volatile long lastAccessTime;
+
+ public Session(String sessionId, String userName, String databaseName) {
+ this.sessionId = sessionId;
+ this.userName = userName;
+ this.currentDatabase = databaseName;
+ this.lastAccessTime = System.currentTimeMillis();
+
+ this.sessionVariables = new HashMap<String, String>();
+ sessionVariables.put(SessionVars.SESSION_ID.keyname(), sessionId);
+ sessionVariables.put(SessionVars.USERNAME.keyname(), userName);
+ selectDatabase(databaseName);
+ }
+
+ public Session(SessionProto proto) {
+ sessionId = proto.getSessionId();
+ userName = proto.getUsername();
+ currentDatabase = proto.getCurrentDatabase();
+ lastAccessTime = proto.getLastAccessTime();
+ KeyValueSet keyValueSet = new KeyValueSet(proto.getVariables());
+ sessionVariables = keyValueSet.getAllKeyValus();
+ }
+
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public void updateLastAccessTime() {
+ lastAccessTime = System.currentTimeMillis();
+ }
+
+ public long getLastAccessTime() {
+ return lastAccessTime;
+ }
+
+ public void setVariable(String name, String value) {
+ synchronized (sessionVariables) {
+ sessionVariables.put(SessionVars.handleDeprecatedName(name), value);
+ }
+ }
+
+ public String getVariable(String name) throws NoSuchSessionVariableException {
+ synchronized (sessionVariables) {
+ if (sessionVariables.containsKey(name)) {
+ return sessionVariables.get(SessionVars.handleDeprecatedName(name));
+ } else {
+ throw new NoSuchSessionVariableException(name);
+ }
+ }
+ }
+
+ public void removeVariable(String name) {
+ synchronized (sessionVariables) {
+ sessionVariables.remove(SessionVars.handleDeprecatedName(name));
+ }
+ }
+
+ public synchronized Map<String, String> getAllVariables() {
+ synchronized (sessionVariables) {
+ sessionVariables.put(SessionVars.SESSION_ID.keyname(), sessionId);
+ sessionVariables.put(SessionVars.USERNAME.keyname(), userName);
+ sessionVariables.put(SessionVars.SESSION_LAST_ACCESS_TIME.keyname(), String.valueOf(lastAccessTime));
+ sessionVariables.put(SessionVars.CURRENT_DATABASE.keyname(), currentDatabase);
+ return ImmutableMap.copyOf(sessionVariables);
+ }
+ }
+
+ public synchronized void selectDatabase(String databaseName) {
+ this.currentDatabase = databaseName;
+ }
+
+ public synchronized String getCurrentDatabase() {
+ return currentDatabase;
+ }
+
+ @Override
+ public SessionProto getProto() {
+ SessionProto.Builder builder = SessionProto.newBuilder();
+ builder.setSessionId(getSessionId());
+ builder.setUsername(getUserName());
+ builder.setCurrentDatabase(getCurrentDatabase());
+ builder.setLastAccessTime(lastAccessTime);
+ KeyValueSet variables = new KeyValueSet();
+
+ synchronized (sessionVariables) {
+ variables.putAll(this.sessionVariables);
+ builder.setVariables(variables.getProto());
+ return builder.build();
+ }
+ }
+
+ public String toString() {
+ return "user=" + getUserName() + ",id=" + getSessionId() +",last_atime=" + getLastAccessTime();
+ }
+
+ public Session clone() throws CloneNotSupportedException {
+ Session newSession = (Session) super.clone();
+ newSession.sessionVariables.putAll(getAllVariables());
+ return newSession;
+ }
+
+ public NonForwardQueryResultScanner getNonForwardQueryResultScanner(QueryId queryId) {
+ synchronized (nonForwardQueryMap) {
+ return nonForwardQueryMap.get(queryId);
+ }
+ }
+
+ public void addNonForwardQueryResultScanner(NonForwardQueryResultScanner resultScanner) {
+ synchronized (nonForwardQueryMap) {
+ nonForwardQueryMap.put(resultScanner.getQueryId(), resultScanner);
+ }
+ }
+
+ public void closeNonForwardQueryResultScanner(QueryId queryId) {
+ NonForwardQueryResultScanner resultScanner;
+ synchronized (nonForwardQueryMap) {
+ resultScanner = nonForwardQueryMap.remove(queryId);
+ }
+
+ if (resultScanner != null) {
+ try {
+ resultScanner.close();
+ } catch (Exception e) {
+ LOG.error("NonForwardQueryResultScanne close error: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ public void close() {
+ try {
+ synchronized (nonForwardQueryMap) {
+ for (NonForwardQueryResultScanner eachQueryScanner: nonForwardQueryMap.values()) {
+ try {
+ eachQueryScanner.close();
+ } catch (Exception e) {
+ LOG.error("Error while closing NonForwardQueryResultScanner: " +
+ eachQueryScanner.getSessionId() + ", " + e.getMessage(), e);
+ }
+ }
+
+ nonForwardQueryMap.clear();
+ }
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ throw new RuntimeException(t.getMessage(), t);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/session/SessionConstants.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/session/SessionConstants.java b/tajo-core/src/main/java/org/apache/tajo/session/SessionConstants.java
new file mode 100644
index 0000000..6c21a27
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/session/SessionConstants.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.session;
+
+public interface SessionConstants {
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/session/SessionEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/session/SessionEvent.java b/tajo-core/src/main/java/org/apache/tajo/session/SessionEvent.java
new file mode 100644
index 0000000..819fd16
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/session/SessionEvent.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.session;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class SessionEvent extends AbstractEvent<SessionEventType> {
+ private final String sessionId;
+
+ public SessionEvent(String sessionId, SessionEventType sessionEventType) {
+ super(sessionEventType);
+ this.sessionId = sessionId;
+ }
+
+ public String getSessionId() {
+ return sessionId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/session/SessionEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/session/SessionEventType.java b/tajo-core/src/main/java/org/apache/tajo/session/SessionEventType.java
new file mode 100644
index 0000000..8270926
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/session/SessionEventType.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.session;
+
+public enum SessionEventType {
+ EXPIRE,
+ PING
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/session/SessionLivelinessMonitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/session/SessionLivelinessMonitor.java b/tajo-core/src/main/java/org/apache/tajo/session/SessionLivelinessMonitor.java
new file mode 100644
index 0000000..2badccb
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/session/SessionLivelinessMonitor.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.session;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tajo.conf.TajoConf;
+
+public class SessionLivelinessMonitor extends AbstractLivelinessMonitor<String> {
+
+ private EventHandler dispatcher;
+
+ public SessionLivelinessMonitor(Dispatcher d) {
+ super(SessionLivelinessMonitor.class.getSimpleName(), new SystemClock());
+ this.dispatcher = d.getEventHandler();
+ }
+
+ public void serviceInit(Configuration conf) throws Exception {
+ Preconditions.checkArgument(conf instanceof TajoConf);
+ TajoConf systemConf = (TajoConf) conf;
+
+ // seconds
+ int expireIntvl = systemConf.getIntVar(TajoConf.ConfVars.$CLIENT_SESSION_EXPIRY_TIME);
+ setExpireInterval(expireIntvl);
+ setMonitorInterval(expireIntvl / 3);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void expire(String id) {
+ dispatcher.handle(new SessionEvent(id, SessionEventType.EXPIRE));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/session/SessionManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/session/SessionManager.java b/tajo-core/src/main/java/org/apache/tajo/session/SessionManager.java
new file mode 100644
index 0000000..571144b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/session/SessionManager.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.session;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SessionManager extends CompositeService implements EventHandler<SessionEvent> {
+ private static final Log LOG = LogFactory.getLog(SessionManager.class);
+
+ public final ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<String, Session>();
+ private final Dispatcher dispatcher;
+ private SessionLivelinessMonitor sessionLivelinessMonitor;
+
+
+ public SessionManager(Dispatcher dispatcher) {
+ super(SessionManager.class.getSimpleName());
+ this.dispatcher = dispatcher;
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ sessionLivelinessMonitor = new SessionLivelinessMonitor(dispatcher);
+ addIfService(sessionLivelinessMonitor);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ super.serviceStop();
+ }
+
+ private void assertSessionExistence(String sessionId) throws InvalidSessionException {
+ if (!sessions.containsKey(sessionId)) {
+ throw new InvalidSessionException(sessionId);
+ }
+ }
+
+ public String createSession(String username, String baseDatabaseName) throws InvalidSessionException {
+ String sessionId;
+ Session oldSession;
+
+ sessionId = UUID.randomUUID().toString();
+ Session newSession = new Session(sessionId, username, baseDatabaseName);
+ oldSession = sessions.putIfAbsent(sessionId, newSession);
+ if (oldSession != null) {
+ throw new InvalidSessionException("Session id is duplicated: " + oldSession.getSessionId());
+ }
+ LOG.info("Session " + sessionId + " is created." );
+ return sessionId;
+ }
+
+ public Session removeSession(String sessionId) {
+ if (sessions.containsKey(sessionId)) {
+ LOG.info("Session " + sessionId + " is removed.");
+ Session session = sessions.remove(sessionId);
+ session.close();
+ return session;
+ } else {
+ LOG.error("No such session id: " + sessionId);
+ return null;
+ }
+ }
+
+ public Session getSession(String sessionId) throws InvalidSessionException {
+ assertSessionExistence(sessionId);
+ touch(sessionId);
+ return sessions.get(sessionId);
+ }
+
+ public void setVariable(String sessionId, String name, String value) throws InvalidSessionException {
+ assertSessionExistence(sessionId);
+ touch(sessionId);
+ sessions.get(sessionId).setVariable(name, value);
+ }
+
+ public String getVariable(String sessionId, String name)
+ throws InvalidSessionException, NoSuchSessionVariableException {
+ assertSessionExistence(sessionId);
+ touch(sessionId);
+ return sessions.get(sessionId).getVariable(name);
+ }
+
+ public void removeVariable(String sessionId, String name) throws InvalidSessionException {
+ assertSessionExistence(sessionId);
+ touch(sessionId);
+ sessions.get(sessionId).removeVariable(name);
+ }
+
+ public Map<String, String> getAllVariables(String sessionId) throws InvalidSessionException {
+ assertSessionExistence(sessionId);
+ touch(sessionId);
+ return sessions.get(sessionId).getAllVariables();
+ }
+
+ public void touch(String sessionId) throws InvalidSessionException {
+ assertSessionExistence(sessionId);
+ sessions.get(sessionId).updateLastAccessTime();
+ sessionLivelinessMonitor.receivedPing(sessionId);
+ }
+
+ @Override
+ public void handle(SessionEvent event) {
+ LOG.info("Processing " + event.getSessionId() + " of type " + event.getType());
+
+ try {
+ assertSessionExistence(event.getSessionId());
+ touch(event.getSessionId());
+ } catch (InvalidSessionException e) {
+ LOG.error(e);
+ }
+
+ if (event.getType() == SessionEventType.EXPIRE) {
+ Session session = removeSession(event.getSessionId());
+ if (session != null) {
+ LOG.info("[Expired] Session username=" + session.getUserName() + ",sessionid=" + event.getSessionId());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
index 6050617..d711258 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -24,11 +24,11 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.master.TajoMaster.MasterContext;
-import org.apache.tajo.master.ha.HAService;
-import org.apache.tajo.master.querymaster.QueryInProgress;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
-import org.apache.tajo.master.querymaster.Task;
-import org.apache.tajo.master.querymaster.Stage;
+import org.apache.tajo.ha.HAService;
+import org.apache.tajo.querymaster.QueryInProgress;
+import org.apache.tajo.querymaster.QueryMasterTask;
+import org.apache.tajo.querymaster.Task;
+import org.apache.tajo.querymaster.Stage;
import org.apache.tajo.util.history.TaskHistory;
import org.apache.tajo.util.history.StageHistory;
import org.apache.tajo.worker.TaskRunnerHistory;
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
index 932f584..c3f0087 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoWorkerProtocol.TaskHistoryProto;
-import org.apache.tajo.master.querymaster.QueryInfo;
+import org.apache.tajo.master.QueryInfo;
import java.io.EOFException;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
index 5934885..9eb58da 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.service.AbstractService;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.master.querymaster.QueryInfo;
+import org.apache.tajo.master.QueryInfo;
import org.apache.tajo.worker.TaskHistory;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
index 208591f..89c3404 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
@@ -24,8 +24,8 @@ import com.google.common.collect.Lists;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.common.ProtoObject;
import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.querymaster.Task;
-import org.apache.tajo.master.querymaster.Repartitioner;
+import org.apache.tajo.querymaster.Task;
+import org.apache.tajo.querymaster.Repartitioner;
import org.apache.tajo.util.TUtil;
import java.net.URI;
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 8944eae..8241478 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -38,9 +38,9 @@ import org.apache.tajo.master.container.TajoContainerId;
import org.apache.tajo.master.event.ContainerAllocationEvent;
import org.apache.tajo.master.event.ContainerAllocatorEventType;
import org.apache.tajo.master.event.StageContainerAllocationEvent;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
-import org.apache.tajo.master.querymaster.Stage;
-import org.apache.tajo.master.querymaster.StageState;
+import org.apache.tajo.querymaster.QueryMasterTask;
+import org.apache.tajo.querymaster.Stage;
+import org.apache.tajo.querymaster.StageState;
import org.apache.tajo.master.rm.*;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NettyClientBase;
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 4d96529..09a87e0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -38,9 +38,9 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ha.HAServiceUtil;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.ha.TajoMasterInfo;
-import org.apache.tajo.master.querymaster.QueryMaster;
-import org.apache.tajo.master.querymaster.QueryMasterManagerService;
+import org.apache.tajo.ha.TajoMasterInfo;
+import org.apache.tajo.querymaster.QueryMaster;
+import org.apache.tajo.querymaster.QueryMasterManagerService;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.rpc.RpcChannelFactory;
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index 1c83110..2ae4bed 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -31,7 +31,7 @@ import org.apache.tajo.ipc.ClientProtos.GetQueryHistoryResponse;
import org.apache.tajo.ipc.ClientProtos.QueryIdRequest;
import org.apache.tajo.ipc.ClientProtos.ResultCode;
import org.apache.tajo.ipc.QueryMasterClientProtocol;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.querymaster.QueryMasterTask;
import org.apache.tajo.rpc.BlockingRpcServer;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.util.NetUtils;
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/tajo-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/tajo-default.xml b/tajo-core/src/main/resources/tajo-default.xml
index db92b02..4a92e72 100644
--- a/tajo-core/src/main/resources/tajo-default.xml
+++ b/tajo-core/src/main/resources/tajo-default.xml
@@ -39,7 +39,7 @@
<property>
<name>tajo.querymaster.task-scheduler</name>
- <value>org.apache.tajo.master.DefaultTaskScheduler</value>
+ <value>org.apache.tajo.querymaster.DefaultTaskScheduler</value>
</property>
</configuration>
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/admin/catalogview.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/catalogview.jsp b/tajo-core/src/main/resources/webapps/admin/catalogview.jsp
index 8f1d1bc..bc770d7 100644
--- a/tajo-core/src/main/resources/webapps/admin/catalogview.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/catalogview.jsp
@@ -24,7 +24,7 @@
<%@ page import="org.apache.tajo.catalog.TableDesc" %>
<%@ page import="org.apache.tajo.catalog.partition.PartitionMethodDesc" %>
<%@ page import="org.apache.tajo.master.TajoMaster" %>
-<%@ page import="org.apache.tajo.master.ha.HAService" %>
+<%@ page import="org.apache.tajo.ha.HAService" %>
<%@ page import="org.apache.tajo.util.FileUtil" %>
<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
<%@ page import="java.util.Collection" %>
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/admin/cluster.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/cluster.jsp b/tajo-core/src/main/resources/webapps/admin/cluster.jsp
index 6fe21a2..1fb5e40 100644
--- a/tajo-core/src/main/resources/webapps/admin/cluster.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/cluster.jsp
@@ -21,8 +21,8 @@
<%@ page import="org.apache.tajo.master.TajoMaster" %>
<%@ page import="org.apache.tajo.master.cluster.WorkerConnectionInfo" %>
-<%@ page import="org.apache.tajo.master.ha.HAService" %>
-<%@ page import="org.apache.tajo.master.ha.TajoMasterInfo" %>
+<%@ page import="org.apache.tajo.ha.HAService" %>
+<%@ page import="org.apache.tajo.ha.TajoMasterInfo" %>
<%@ page import="org.apache.tajo.master.rm.Worker" %>
<%@ page import="org.apache.tajo.master.rm.WorkerResource" %>
<%@ page import="org.apache.tajo.master.rm.WorkerState" %>
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp
index 6778725..00186d7 100644
--- a/tajo-core/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/index.jsp
@@ -23,9 +23,9 @@
<%@ page import="org.apache.tajo.conf.TajoConf" %>
<%@ page import="org.apache.tajo.ipc.TajoMasterProtocol" %>
<%@ page import="org.apache.tajo.master.TajoMaster" %>
-<%@ page import="org.apache.tajo.master.ha.HAService" %>
-<%@ page import="org.apache.tajo.master.ha.TajoMasterInfo" %>
-<%@ page import="org.apache.tajo.master.querymaster.QueryInProgress" %>
+<%@ page import="org.apache.tajo.ha.HAService" %>
+<%@ page import="org.apache.tajo.ha.TajoMasterInfo" %>
+<%@ page import="org.apache.tajo.querymaster.QueryInProgress" %>
<%@ page import="org.apache.tajo.master.rm.Worker" %>
<%@ page import="org.apache.tajo.master.rm.WorkerState" %>
<%@ page import="org.apache.tajo.util.NetUtils" %>
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp
index 5afb3b2..4d8e5e6 100644
--- a/tajo-core/src/main/resources/webapps/admin/query.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/query.jsp
@@ -20,7 +20,7 @@
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
<%@ page import="org.apache.tajo.master.TajoMaster" %>
-<%@ page import="org.apache.tajo.master.querymaster.QueryInProgress" %>
+<%@ page import="org.apache.tajo.querymaster.QueryInProgress" %>
<%@ page import="org.apache.tajo.master.rm.Worker" %>
<%@ page import="org.apache.tajo.util.JSPUtil" %>
<%@ page import="org.apache.tajo.util.StringUtils" %>
@@ -28,7 +28,7 @@
<%@ page import="java.text.SimpleDateFormat" %>
<%@ page import="java.util.*" %>
<%@ page import="org.apache.tajo.util.history.HistoryReader" %>
-<%@ page import="org.apache.tajo.master.querymaster.QueryInfo" %>
+<%@ page import="org.apache.tajo.master.QueryInfo" %>
<%
TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/admin/query_executor.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/query_executor.jsp b/tajo-core/src/main/resources/webapps/admin/query_executor.jsp
index 9ff6625..82836ac 100644
--- a/tajo-core/src/main/resources/webapps/admin/query_executor.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/query_executor.jsp
@@ -19,7 +19,7 @@
%>
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
<%@ page import="org.apache.tajo.master.TajoMaster" %>
-<%@ page import="org.apache.tajo.master.ha.HAService" %>
+<%@ page import="org.apache.tajo.ha.HAService" %>
<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
<%
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/worker/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/index.jsp b/tajo-core/src/main/resources/webapps/worker/index.jsp
index 866d663..bb72f9e 100644
--- a/tajo-core/src/main/resources/webapps/worker/index.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/index.jsp
@@ -19,8 +19,8 @@
%>
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
-<%@ page import="org.apache.tajo.master.querymaster.Query" %>
-<%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %>
+<%@ page import="org.apache.tajo.querymaster.Query" %>
+<%@ page import="org.apache.tajo.querymaster.QueryMasterTask" %>
<%@ page import="org.apache.tajo.util.JSPUtil" %>
<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
<%@ page import="org.apache.tajo.worker.TajoWorker" %>
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/worker/querydetail.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/querydetail.jsp b/tajo-core/src/main/resources/webapps/worker/querydetail.jsp
index 340eb95..56bdeba 100644
--- a/tajo-core/src/main/resources/webapps/worker/querydetail.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/querydetail.jsp
@@ -20,8 +20,8 @@
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
<%@ page import="org.apache.tajo.QueryId" %>
-<%@ page import="org.apache.tajo.master.querymaster.Query" %>
-<%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %>
+<%@ page import="org.apache.tajo.querymaster.Query" %>
+<%@ page import="org.apache.tajo.querymaster.QueryMasterTask" %>
<%@ page import="org.apache.tajo.util.JSPUtil" %>
<%@ page import="org.apache.tajo.util.TajoIdUtils" %>
<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/worker/queryplan.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/queryplan.jsp b/tajo-core/src/main/resources/webapps/worker/queryplan.jsp
index 88de97d..878efe3 100644
--- a/tajo-core/src/main/resources/webapps/worker/queryplan.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/queryplan.jsp
@@ -21,11 +21,11 @@
<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
<%@ page import="org.apache.tajo.worker.*" %>
-<%@ page import="org.apache.tajo.master.querymaster.Query" %>
+<%@ page import="org.apache.tajo.querymaster.Query" %>
<%@ page import="org.apache.tajo.QueryId" %>
<%@ page import="org.apache.tajo.util.TajoIdUtils" %>
-<%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %>
-<%@ page import="org.apache.tajo.master.querymaster.Stage" %>
+<%@ page import="org.apache.tajo.querymaster.QueryMasterTask" %>
+<%@ page import="org.apache.tajo.querymaster.Stage" %>
<%@ page import="org.apache.tajo.engine.planner.global.ExecutionBlock" %>
<%@ page import="java.util.*" %>
<%@ page import="org.apache.tajo.ExecutionBlockId" %>
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
index 3aef49d..6d0e3a2 100644
--- a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
@@ -25,7 +25,7 @@
<%@ page import="org.apache.tajo.catalog.statistics.TableStats" %>
<%@ page import="org.apache.tajo.plan.util.PlannerUtil" %>
<%@ page import="org.apache.tajo.ipc.TajoMasterProtocol" %>
-<%@ page import="org.apache.tajo.master.querymaster.*" %>
+<%@ page import="org.apache.tajo.querymaster.*" %>
<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
<%@ page import="org.apache.tajo.worker.TajoWorker" %>
<%@ page import="java.text.NumberFormat" %>
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/resources/webapps/worker/task.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/task.jsp b/tajo-core/src/main/resources/webapps/worker/task.jsp
index 81b1e6d..17e884a 100644
--- a/tajo-core/src/main/resources/webapps/worker/task.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/task.jsp
@@ -25,10 +25,10 @@
<%@ page import="org.apache.tajo.catalog.proto.CatalogProtos" %>
<%@ page import="org.apache.tajo.catalog.statistics.TableStats" %>
<%@ page import="org.apache.tajo.ipc.TajoWorkerProtocol" %>
-<%@ page import="org.apache.tajo.master.querymaster.Query" %>
-<%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %>
-<%@ page import="org.apache.tajo.master.querymaster.Task" %>
-<%@ page import="org.apache.tajo.master.querymaster.Stage" %>
+<%@ page import="org.apache.tajo.querymaster.Query" %>
+<%@ page import="org.apache.tajo.querymaster.QueryMasterTask" %>
+<%@ page import="org.apache.tajo.querymaster.Task" %>
+<%@ page import="org.apache.tajo.querymaster.Stage" %>
<%@ page import="org.apache.tajo.storage.DataLocation" %>
<%@ page import="org.apache.tajo.storage.fragment.FileFragment" %>
<%@ page import="org.apache.tajo.storage.fragment.FragmentConvertor" %>
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
index 8bee6fb..e464446 100644
--- a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
+++ b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
@@ -35,7 +35,7 @@ import org.apache.tajo.client.TajoClientImpl;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.master.session.Session;
+import org.apache.tajo.session.Session;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.FileUtil;
import org.apache.tajo.util.KeyValueSet;
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 0d2f6fa..0786912 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -44,10 +44,10 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanTestRuleProvider;
import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.master.querymaster.*;
-import org.apache.tajo.master.querymaster.Query;
-import org.apache.tajo.master.querymaster.Stage;
-import org.apache.tajo.master.querymaster.StageState;
+import org.apache.tajo.querymaster.*;
+import org.apache.tajo.querymaster.Query;
+import org.apache.tajo.querymaster.Stage;
+import org.apache.tajo.querymaster.StageState;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
import org.apache.tajo.plan.rewrite.LogicalPlanTestRuleProvider;
import org.apache.tajo.util.CommonTestingUtil;
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index 889d61c..0b59bc7 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@ -37,7 +37,7 @@ import org.apache.tajo.engine.function.builtin.SumInt;
import org.apache.tajo.engine.json.CoreGsonHelper;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.master.session.Session;
+import org.apache.tajo.session.Session;
import org.apache.tajo.plan.*;
import org.apache.tajo.plan.expr.*;
import org.apache.tajo.plan.logical.*;
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 6db76ae..d3ab1fd 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -44,7 +44,7 @@ import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.master.session.Session;
+import org.apache.tajo.session.Session;
import org.apache.tajo.plan.LogicalOptimizer;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.LogicalPlanner;
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
index 1a212b0..d1756e1 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
@@ -24,10 +24,10 @@ import org.apache.tajo.*;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.master.querymaster.Query;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
-import org.apache.tajo.master.querymaster.Task;
-import org.apache.tajo.master.querymaster.Stage;
+import org.apache.tajo.querymaster.Query;
+import org.apache.tajo.querymaster.QueryMasterTask;
+import org.apache.tajo.querymaster.Task;
+import org.apache.tajo.querymaster.Stage;
import org.apache.tajo.storage.StorageConstants;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.TUtil;
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
index 68b3fb3..39b58d0 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
@@ -32,7 +32,7 @@ import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.jdbc.TajoResultSet;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.querymaster.QueryMasterTask;
import org.apache.tajo.storage.*;
import org.apache.tajo.util.FileUtil;
import org.apache.tajo.util.KeyValueSet;
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
index 3400752..cacef96 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
@@ -38,7 +38,7 @@ import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.jdbc.TajoResultSet;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.querymaster.QueryMasterTask;
import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.storage.StorageConstants;
import org.apache.tajo.util.CommonTestingUtil;