You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/01/08 16:36:15 UTC
[07/13] tajo git commit: TAJO-1288: Refactoring
org.apache.tajo.master package.
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/TaskAttempt.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/TaskAttempt.java
deleted file mode 100644
index 0f161ff..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/TaskAttempt.java
+++ /dev/null
@@ -1,443 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.state.*;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.TajoProtos.TaskAttemptState;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
-import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
-import org.apache.tajo.master.querymaster.Task.IntermediateEntry;
-import org.apache.tajo.master.querymaster.Task.PullHost;
-import org.apache.tajo.master.container.TajoContainerId;
-
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput;
-
-public class TaskAttempt implements EventHandler<TaskAttemptEvent> {
-
- private static final Log LOG = LogFactory.getLog(TaskAttempt.class);
-
- private final static int EXPIRE_TIME = 15000;
-
- private final TaskAttemptId id;
- private final Task task;
- final EventHandler eventHandler;
-
- private TajoContainerId containerId;
- private WorkerConnectionInfo workerConnectionInfo;
- private int expire;
-
- private final Lock readLock;
- private final Lock writeLock;
-
- private final List<String> diagnostics = new ArrayList<String>();
-
- private final TaskAttemptScheduleContext scheduleContext;
-
- private float progress;
- private CatalogProtos.TableStatsProto inputStats;
- private CatalogProtos.TableStatsProto resultStats;
-
- protected static final StateMachineFactory
- <TaskAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
- stateMachineFactory = new StateMachineFactory
- <TaskAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
- (TaskAttemptState.TA_NEW)
-
- // Transitions from TA_NEW state
- .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
- TaskAttemptEventType.TA_SCHEDULE, new TaskAttemptScheduleTransition())
- .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
- TaskAttemptEventType.TA_RESCHEDULE, new TaskAttemptScheduleTransition())
- .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_KILLED,
- TaskAttemptEventType.TA_KILL,
- new TaskKilledCompleteTransition())
-
- // Transitions from TA_UNASSIGNED state
- .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_ASSIGNED,
- TaskAttemptEventType.TA_ASSIGNED,
- new LaunchTransition())
- .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_KILL_WAIT,
- TaskAttemptEventType.TA_KILL,
- new KillUnassignedTaskTransition())
-
- // Transitions from TA_ASSIGNED state
- .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_ASSIGNED,
- TaskAttemptEventType.TA_ASSIGNED, new AlreadyAssignedTransition())
- .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILL_WAIT,
- TaskAttemptEventType.TA_KILL,
- new KillTaskTransition())
- .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILLED,
- TaskAttemptEventType.TA_KILL,
- new KillTaskTransition())
- .addTransition(TaskAttemptState.TA_ASSIGNED,
- EnumSet.of(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILLED),
- TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
- .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_SUCCEEDED,
- TaskAttemptEventType.TA_DONE, new SucceededTransition())
- .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_FAILED,
- TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
-
- // Transitions from TA_RUNNING state
- .addTransition(TaskAttemptState.TA_RUNNING,
- EnumSet.of(TaskAttemptState.TA_RUNNING),
- TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
- .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILL_WAIT,
- TaskAttemptEventType.TA_KILL,
- new KillTaskTransition())
- .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_SUCCEEDED,
- TaskAttemptEventType.TA_DONE, new SucceededTransition())
- .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_FAILED,
- TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
-
- .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
- TaskAttemptEventType.TA_LOCAL_KILLED,
- new TaskKilledCompleteTransition())
- .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT,
- TaskAttemptEventType.TA_ASSIGNED,
- new KillTaskTransition())
- .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
- TaskAttemptEventType.TA_SCHEDULE_CANCELED,
- new TaskKilledCompleteTransition())
- .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
- TaskAttemptEventType.TA_DONE,
- new TaskKilledCompleteTransition())
- .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_FAILED,
- TaskAttemptEventType.TA_FATAL_ERROR)
- .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT,
- EnumSet.of(
- TaskAttemptEventType.TA_KILL,
- TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
- TaskAttemptEventType.TA_UPDATE))
-
- // Transitions from TA_SUCCEEDED state
- .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
- TaskAttemptEventType.TA_UPDATE)
- .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
- TaskAttemptEventType.TA_DONE, new AlreadyDoneTransition())
- .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_FAILED,
- TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
- // Ignore-able transitions
- .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
- TaskAttemptEventType.TA_KILL)
-
- // Transitions from TA_KILLED state
- .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
- TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE)
- // Ignore-able transitions
- .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
- EnumSet.of(
- TaskAttemptEventType.TA_UPDATE))
- .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
- EnumSet.of(
- TaskAttemptEventType.TA_LOCAL_KILLED,
- TaskAttemptEventType.TA_KILL,
- TaskAttemptEventType.TA_ASSIGNED,
- TaskAttemptEventType.TA_DONE),
- new TaskKilledCompleteTransition())
- .installTopology();
-
- private final StateMachine<TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
- stateMachine;
-
-
- public TaskAttempt(final TaskAttemptScheduleContext scheduleContext,
- final TaskAttemptId id, final Task task,
- final EventHandler eventHandler) {
- this.scheduleContext = scheduleContext;
- this.id = id;
- this.expire = TaskAttempt.EXPIRE_TIME;
- this.task = task;
- this.eventHandler = eventHandler;
-
- ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- this.readLock = readWriteLock.readLock();
- this.writeLock = readWriteLock.writeLock();
-
- stateMachine = stateMachineFactory.make(this);
- }
-
- public TaskAttemptState getState() {
- readLock.lock();
- try {
- return stateMachine.getCurrentState();
- } finally {
- readLock.unlock();
- }
- }
-
- public TaskAttemptId getId() {
- return this.id;
- }
-
- public boolean isLeafTask() {
- return this.task.isLeafTask();
- }
-
- public Task getTask() {
- return this.task;
- }
-
- public WorkerConnectionInfo getWorkerConnectionInfo() {
- return this.workerConnectionInfo;
- }
-
- public void setContainerId(TajoContainerId containerId) {
- this.containerId = containerId;
- }
-
- public synchronized void setExpireTime(int expire) {
- this.expire = expire;
- }
-
- public synchronized void updateExpireTime(int period) {
- this.setExpireTime(this.expire - period);
- }
-
- public synchronized void resetExpireTime() {
- this.setExpireTime(TaskAttempt.EXPIRE_TIME);
- }
-
- public int getLeftTime() {
- return this.expire;
- }
-
- public float getProgress() {
- return progress;
- }
-
- public TableStats getInputStats() {
- if (inputStats == null) {
- return null;
- }
-
- return new TableStats(inputStats);
- }
-
- public TableStats getResultStats() {
- if (resultStats == null) {
- return null;
- }
- return new TableStats(resultStats);
- }
-
- private void fillTaskStatistics(TaskCompletionReport report) {
- this.progress = 1.0f;
-
- List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
-
- if (report.getShuffleFileOutputsCount() > 0) {
- this.getTask().setShuffleFileOutputs(report.getShuffleFileOutputsList());
-
- PullHost host = new PullHost(getWorkerConnectionInfo().getHost(), getWorkerConnectionInfo().getPullServerPort());
- for (ShuffleFileOutput p : report.getShuffleFileOutputsList()) {
- IntermediateEntry entry = new IntermediateEntry(getId().getTaskId().getId(),
- getId().getId(), p.getPartId(), host, p.getVolume());
- partitions.add(entry);
- }
- }
- this.getTask().setIntermediateData(partitions);
-
- if (report.hasInputStats()) {
- this.inputStats = report.getInputStats();
- }
- if (report.hasResultStats()) {
- this.resultStats = report.getResultStats();
- this.getTask().setStats(new TableStats(resultStats));
- }
- }
-
- private static class TaskAttemptScheduleTransition implements
- SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
-
- @Override
- public void transition(TaskAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) {
- taskAttempt.eventHandler.handle(new TaskAttemptToSchedulerEvent(
- EventType.T_SCHEDULE, taskAttempt.getTask().getId().getExecutionBlockId(),
- taskAttempt.scheduleContext, taskAttempt));
- }
- }
-
- private static class KillUnassignedTaskTransition implements
- SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
-
- @Override
- public void transition(TaskAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) {
- taskAttempt.eventHandler.handle(new TaskAttemptToSchedulerEvent(
- EventType.T_SCHEDULE_CANCEL, taskAttempt.getTask().getId().getExecutionBlockId(),
- taskAttempt.scheduleContext, taskAttempt));
- }
- }
-
- private static class LaunchTransition
- implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
-
- @Override
- public void transition(TaskAttempt taskAttempt,
- TaskAttemptEvent event) {
- TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event;
- taskAttempt.containerId = castEvent.getContainerId();
- taskAttempt.workerConnectionInfo = castEvent.getWorkerConnectionInfo();
- taskAttempt.eventHandler.handle(
- new TaskTAttemptEvent(taskAttempt.getId(),
- TaskEventType.T_ATTEMPT_LAUNCHED));
- }
- }
-
- private static class TaskKilledCompleteTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
-
- @Override
- public void transition(TaskAttempt taskAttempt,
- TaskAttemptEvent event) {
- taskAttempt.getTask().handle(new TaskEvent(taskAttempt.getId().getTaskId(),
- TaskEventType.T_ATTEMPT_KILLED));
- LOG.info(taskAttempt.getId() + " Received TA_KILLED Status from LocalTask");
- }
- }
-
- private static class StatusUpdateTransition
- implements MultipleArcTransition<TaskAttempt, TaskAttemptEvent, TaskAttemptState> {
-
- @Override
- public TaskAttemptState transition(TaskAttempt taskAttempt,
- TaskAttemptEvent event) {
- TaskAttemptStatusUpdateEvent updateEvent = (TaskAttemptStatusUpdateEvent) event;
-
- taskAttempt.progress = updateEvent.getStatus().getProgress();
- taskAttempt.inputStats = updateEvent.getStatus().getInputStats();
- taskAttempt.resultStats = updateEvent.getStatus().getResultStats();
-
- return TaskAttemptState.TA_RUNNING;
- }
- }
-
- private void addDiagnosticInfo(String diag) {
- if (diag != null && !diag.equals("")) {
- diagnostics.add(diag);
- }
- }
-
- private static class AlreadyAssignedTransition
- implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{
-
- @Override
- public void transition(TaskAttempt taskAttempt,
- TaskAttemptEvent taskAttemptEvent) {
- }
- }
-
- private static class AlreadyDoneTransition
- implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{
-
- @Override
- public void transition(TaskAttempt taskAttempt,
- TaskAttemptEvent taskAttemptEvent) {
- }
- }
-
- private static class SucceededTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
- @Override
- public void transition(TaskAttempt taskAttempt,
- TaskAttemptEvent event) {
- TaskCompletionReport report = ((TaskCompletionEvent)event).getReport();
-
- try {
- taskAttempt.fillTaskStatistics(report);
- taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_SUCCEEDED));
- } catch (Throwable t) {
- taskAttempt.eventHandler.handle(new TaskFatalErrorEvent(taskAttempt.getId(), t.getMessage()));
- taskAttempt.addDiagnosticInfo(ExceptionUtils.getStackTrace(t));
- }
- }
- }
-
- private static class KillTaskTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
-
- @Override
- public void transition(TaskAttempt taskAttempt, TaskAttemptEvent event) {
- taskAttempt.eventHandler.handle(new LocalTaskEvent(taskAttempt.getId(), taskAttempt.containerId,
- LocalTaskEventType.KILL));
- }
- }
-
- private static class FailedTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{
- @Override
- public void transition(TaskAttempt taskAttempt, TaskAttemptEvent event) {
- TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event;
- taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_FAILED));
- taskAttempt.addDiagnosticInfo(errorEvent.errorMessage());
- LOG.error(taskAttempt.getId() + " FROM " + taskAttempt.getWorkerConnectionInfo().getHost()
- + " >> " + errorEvent.errorMessage());
- }
- }
-
- @Override
- public void handle(TaskAttemptEvent event) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing " + event.getTaskAttemptId() + " of type " + event.getType());
- }
- try {
- writeLock.lock();
- TaskAttemptState oldState = getState();
- try {
- stateMachine.doTransition(event.getType(), event);
- } catch (InvalidStateTransitonException e) {
- LOG.error("Can't handle this event at current state of " + event.getTaskAttemptId() + ")"
- + ", eventType:" + event.getType().name()
- + ", oldState:" + oldState.name()
- + ", nextState:" + getState().name()
- , e);
- eventHandler.handle(
- new StageDiagnosticsUpdateEvent(event.getTaskAttemptId().getTaskId().getExecutionBlockId(),
- "Can't handle this event at current state of " + event.getTaskAttemptId() + ")"));
- eventHandler.handle(
- new StageEvent(event.getTaskAttemptId().getTaskId().getExecutionBlockId(),
- StageEventType.SQ_INTERNAL_ERROR));
- }
-
- //notify the eventhandler of state change
- if (LOG.isDebugEnabled()) {
- if (oldState != getState()) {
- LOG.debug(id + " TaskAttempt Transitioned from " + oldState + " to "
- + getState());
- }
- }
- }
-
- finally {
- writeLock.unlock();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index f1a9224..c4200d5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -36,10 +36,9 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.ContainerProtocol;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.master.querymaster.QueryInProgress;
+import org.apache.tajo.querymaster.QueryInProgress;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.util.ApplicationIdUtils;
-import org.apache.tajo.util.StringUtils;
import java.io.IOException;
import java.util.*;
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
index 9c2b71b..b237cc5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.service.Service;
import org.apache.tajo.QueryId;
import org.apache.tajo.ipc.ContainerProtocol;
import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.querymaster.QueryInProgress;
+import org.apache.tajo.querymaster.QueryInProgress;
import java.io.IOException;
import java.util.Collection;
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QuerySchedulingInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QuerySchedulingInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QuerySchedulingInfo.java
new file mode 100644
index 0000000..3dd3389
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QuerySchedulingInfo.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.scheduler;
+
+import com.google.common.base.Objects;
+import org.apache.tajo.QueryId;
+
+public class QuerySchedulingInfo {
+ private QueryId queryId;
+ private Integer priority;
+ private Long startTime;
+
+ public QuerySchedulingInfo(QueryId queryId, Integer priority, Long startTime) {
+ this.queryId = queryId;
+ this.priority = priority;
+ this.startTime = startTime;
+ }
+
+ public QueryId getQueryId() {
+ return queryId;
+ }
+
+ public Integer getPriority() {
+ return priority;
+ }
+
+ public Long getStartTime() {
+ return startTime;
+ }
+
+ public String getName() {
+ return queryId.getId();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(startTime, getName(), priority);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java
new file mode 100644
index 0000000..02203a9
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.scheduler;
+
+import org.apache.tajo.QueryId;
+import org.apache.tajo.querymaster.QueryInProgress;
+
+import java.util.List;
+
+public interface Scheduler {
+
+ public Mode getMode();
+
+ public String getName();
+
+ public boolean addQuery(QueryInProgress resource);
+
+ public boolean removeQuery(QueryId queryId);
+
+ public List<QueryInProgress> getRunningQueries();
+
+ public enum Mode {
+ FIFO
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SchedulingAlgorithms.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SchedulingAlgorithms.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SchedulingAlgorithms.java
new file mode 100644
index 0000000..7fd07b5
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SchedulingAlgorithms.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.scheduler;
+
+import java.util.Comparator;
+
+/**
+ * Utility class containing scheduling algorithms used in the scheduler.
+ */
+
+public class SchedulingAlgorithms {
+ /**
+ * Compare Schedulables in order of priority and then submission time, as in
+ * the default FIFO scheduler in Tajo.
+ */
+ public static class FifoComparator implements Comparator<QuerySchedulingInfo> {
+ @Override
+ public int compare(QuerySchedulingInfo q1, QuerySchedulingInfo q2) {
+ int res = q1.getPriority().compareTo(q2.getPriority());
+ if (res == 0) {
+ res = (int) Math.signum(q1.getStartTime() - q2.getStartTime());
+ }
+ if (res == 0) {
+ // In the rare case where jobs were submitted at the exact same time,
+ // compare them by name (which will be the QueryId) to get a deterministic ordering
+ res = q1.getName().compareTo(q2.getName());
+ }
+ return res;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
new file mode 100644
index 0000000..bd8ca28
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.scheduler;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.querymaster.QueryInProgress;
+import org.apache.tajo.master.QueryJobManager;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SimpleFifoScheduler implements Scheduler {
+ private static final Log LOG = LogFactory.getLog(SimpleFifoScheduler.class.getName());
+ private LinkedList<QuerySchedulingInfo> pool = new LinkedList<QuerySchedulingInfo>();
+ private final Thread queryProcessor;
+ private AtomicBoolean stopped = new AtomicBoolean();
+ private QueryJobManager manager;
+ private Comparator<QuerySchedulingInfo> COMPARATOR = new SchedulingAlgorithms.FifoComparator();
+
+ public SimpleFifoScheduler(QueryJobManager manager) {
+ this.manager = manager;
+ this.queryProcessor = new Thread(new QueryProcessor());
+ this.queryProcessor.setName("Query Processor");
+ }
+
+ @Override
+ public Mode getMode() {
+ return Mode.FIFO;
+ }
+
+ @Override
+ public String getName() {
+ return manager.getName();
+ }
+
+ @Override
+ public boolean addQuery(QueryInProgress queryInProgress) {
+ int qSize = pool.size();
+ if (qSize != 0 && qSize % 100 == 0) {
+ LOG.info("Size of Fifo queue is " + qSize);
+ }
+
+ QuerySchedulingInfo querySchedulingInfo = new QuerySchedulingInfo(queryInProgress.getQueryId(), 1, queryInProgress.getStartTime());
+ boolean result = pool.add(querySchedulingInfo);
+ if (getRunningQueries().size() == 0) wakeupProcessor();
+ return result;
+ }
+
+ @Override
+ public boolean removeQuery(QueryId queryId) {
+ return pool.remove(getQueryByQueryId(queryId));
+ }
+
+ public QuerySchedulingInfo getQueryByQueryId(QueryId queryId) {
+ for (QuerySchedulingInfo querySchedulingInfo : pool) {
+ if (querySchedulingInfo.getQueryId().equals(queryId)) {
+ return querySchedulingInfo;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public List<QueryInProgress> getRunningQueries() {
+ return new ArrayList<QueryInProgress>(manager.getRunningQueries());
+ }
+
+ public void start() {
+ queryProcessor.start();
+ }
+
+ public void stop() {
+ if (stopped.getAndSet(true)) {
+ return;
+ }
+ pool.clear();
+ synchronized (queryProcessor) {
+ queryProcessor.interrupt();
+ }
+ }
+
+ private QuerySchedulingInfo pollScheduledQuery() {
+ if (pool.size() > 1) {
+ Collections.sort(pool, COMPARATOR);
+ }
+ return pool.poll();
+ }
+
+ private void wakeupProcessor() {
+ synchronized (queryProcessor) {
+ queryProcessor.notifyAll();
+ }
+ }
+
+ private final class QueryProcessor implements Runnable {
+ @Override
+ public void run() {
+
+ QuerySchedulingInfo query;
+
+ while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+ query = null;
+ if (getRunningQueries().size() == 0) {
+ query = pollScheduledQuery();
+ }
+
+ if (query != null) {
+ try {
+ manager.startQueryJob(query.getQueryId());
+ } catch (Throwable t) {
+ LOG.fatal("Exception during query startup:", t);
+ manager.stopQuery(query.getQueryId());
+ }
+ }
+
+ synchronized (queryProcessor) {
+ try {
+ queryProcessor.wait(500);
+ } catch (InterruptedException e) {
+ if (stopped.get()) {
+ break;
+ }
+ LOG.warn("Exception during shutdown: ", e);
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/session/InvalidSessionException.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/InvalidSessionException.java b/tajo-core/src/main/java/org/apache/tajo/master/session/InvalidSessionException.java
deleted file mode 100644
index 3f48ca5..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/session/InvalidSessionException.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.session;
-
-public class InvalidSessionException extends Exception {
- public InvalidSessionException(String sessionId) {
- super("Invalid session id \"" + sessionId + "\"");
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/session/NoSuchSessionVariableException.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/NoSuchSessionVariableException.java b/tajo-core/src/main/java/org/apache/tajo/master/session/NoSuchSessionVariableException.java
deleted file mode 100644
index 686d860..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/session/NoSuchSessionVariableException.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.session;
-
-public class NoSuchSessionVariableException extends Exception {
- public NoSuchSessionVariableException(String varname) {
- super("No such session variable \"" + varname + "\"");
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java b/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
deleted file mode 100644
index 5f44ecb..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.session;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.SessionVars;
-import org.apache.tajo.master.NonForwardQueryResultScanner;
-import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.common.ProtoObject;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.tajo.ipc.TajoWorkerProtocol.SessionProto;
-
-public class Session implements SessionConstants, ProtoObject<SessionProto>, Cloneable {
- private static final Log LOG = LogFactory.getLog(Session.class);
-
- private final String sessionId;
- private final String userName;
- private String currentDatabase;
- private final Map<String, String> sessionVariables;
- private final Map<QueryId, NonForwardQueryResultScanner> nonForwardQueryMap = new HashMap<QueryId, NonForwardQueryResultScanner>();
-
- // transient status
- private volatile long lastAccessTime;
-
- public Session(String sessionId, String userName, String databaseName) {
- this.sessionId = sessionId;
- this.userName = userName;
- this.currentDatabase = databaseName;
- this.lastAccessTime = System.currentTimeMillis();
-
- this.sessionVariables = new HashMap<String, String>();
- sessionVariables.put(SessionVars.SESSION_ID.keyname(), sessionId);
- sessionVariables.put(SessionVars.USERNAME.keyname(), userName);
- selectDatabase(databaseName);
- }
-
- public Session(SessionProto proto) {
- sessionId = proto.getSessionId();
- userName = proto.getUsername();
- currentDatabase = proto.getCurrentDatabase();
- lastAccessTime = proto.getLastAccessTime();
- KeyValueSet keyValueSet = new KeyValueSet(proto.getVariables());
- sessionVariables = keyValueSet.getAllKeyValus();
- }
-
- public String getSessionId() {
- return sessionId;
- }
-
- public String getUserName() {
- return userName;
- }
-
- public void updateLastAccessTime() {
- lastAccessTime = System.currentTimeMillis();
- }
-
- public long getLastAccessTime() {
- return lastAccessTime;
- }
-
- public void setVariable(String name, String value) {
- synchronized (sessionVariables) {
- sessionVariables.put(SessionVars.handleDeprecatedName(name), value);
- }
- }
-
- public String getVariable(String name) throws NoSuchSessionVariableException {
- synchronized (sessionVariables) {
- if (sessionVariables.containsKey(name)) {
- return sessionVariables.get(SessionVars.handleDeprecatedName(name));
- } else {
- throw new NoSuchSessionVariableException(name);
- }
- }
- }
-
- public void removeVariable(String name) {
- synchronized (sessionVariables) {
- sessionVariables.remove(SessionVars.handleDeprecatedName(name));
- }
- }
-
- public synchronized Map<String, String> getAllVariables() {
- synchronized (sessionVariables) {
- sessionVariables.put(SessionVars.SESSION_ID.keyname(), sessionId);
- sessionVariables.put(SessionVars.USERNAME.keyname(), userName);
- sessionVariables.put(SessionVars.SESSION_LAST_ACCESS_TIME.keyname(), String.valueOf(lastAccessTime));
- sessionVariables.put(SessionVars.CURRENT_DATABASE.keyname(), currentDatabase);
- return ImmutableMap.copyOf(sessionVariables);
- }
- }
-
- public synchronized void selectDatabase(String databaseName) {
- this.currentDatabase = databaseName;
- }
-
- public synchronized String getCurrentDatabase() {
- return currentDatabase;
- }
-
- @Override
- public SessionProto getProto() {
- SessionProto.Builder builder = SessionProto.newBuilder();
- builder.setSessionId(getSessionId());
- builder.setUsername(getUserName());
- builder.setCurrentDatabase(getCurrentDatabase());
- builder.setLastAccessTime(lastAccessTime);
- KeyValueSet variables = new KeyValueSet();
-
- synchronized (sessionVariables) {
- variables.putAll(this.sessionVariables);
- builder.setVariables(variables.getProto());
- return builder.build();
- }
- }
-
- public String toString() {
- return "user=" + getUserName() + ",id=" + getSessionId() +",last_atime=" + getLastAccessTime();
- }
-
- public Session clone() throws CloneNotSupportedException {
- Session newSession = (Session) super.clone();
- newSession.sessionVariables.putAll(getAllVariables());
- return newSession;
- }
-
- public NonForwardQueryResultScanner getNonForwardQueryResultScanner(QueryId queryId) {
- synchronized (nonForwardQueryMap) {
- return nonForwardQueryMap.get(queryId);
- }
- }
-
- public void addNonForwardQueryResultScanner(NonForwardQueryResultScanner resultScanner) {
- synchronized (nonForwardQueryMap) {
- nonForwardQueryMap.put(resultScanner.getQueryId(), resultScanner);
- }
- }
-
- public void closeNonForwardQueryResultScanner(QueryId queryId) {
- NonForwardQueryResultScanner resultScanner;
- synchronized (nonForwardQueryMap) {
- resultScanner = nonForwardQueryMap.remove(queryId);
- }
-
- if (resultScanner != null) {
- try {
- resultScanner.close();
- } catch (Exception e) {
- LOG.error("NonForwardQueryResultScanne close error: " + e.getMessage(), e);
- }
- }
- }
-
- public void close() {
- try {
- synchronized (nonForwardQueryMap) {
- for (NonForwardQueryResultScanner eachQueryScanner: nonForwardQueryMap.values()) {
- try {
- eachQueryScanner.close();
- } catch (Exception e) {
- LOG.error("Error while closing NonForwardQueryResultScanner: " +
- eachQueryScanner.getSessionId() + ", " + e.getMessage(), e);
- }
- }
-
- nonForwardQueryMap.clear();
- }
- } catch (Throwable t) {
- LOG.error(t.getMessage(), t);
- throw new RuntimeException(t.getMessage(), t);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/session/SessionConstants.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionConstants.java b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionConstants.java
deleted file mode 100644
index 46f49a2..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionConstants.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.session;
-
-public interface SessionConstants {
-
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEvent.java
deleted file mode 100644
index dce3ba6..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEvent.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.session;
-
-import org.apache.hadoop.yarn.event.AbstractEvent;
-
-public class SessionEvent extends AbstractEvent<SessionEventType> {
- private final String sessionId;
-
- public SessionEvent(String sessionId, SessionEventType sessionEventType) {
- super(sessionEventType);
- this.sessionId = sessionId;
- }
-
- public String getSessionId() {
- return sessionId;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEventType.java
deleted file mode 100644
index 64c6fc6..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEventType.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.session;
-
-public enum SessionEventType {
- EXPIRE,
- PING
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/session/SessionLivelinessMonitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionLivelinessMonitor.java b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionLivelinessMonitor.java
deleted file mode 100644
index 912f769..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionLivelinessMonitor.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.session;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
-import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tajo.conf.TajoConf;
-
-public class SessionLivelinessMonitor extends AbstractLivelinessMonitor<String> {
-
- private EventHandler dispatcher;
-
- public SessionLivelinessMonitor(Dispatcher d) {
- super(SessionLivelinessMonitor.class.getSimpleName(), new SystemClock());
- this.dispatcher = d.getEventHandler();
- }
-
- public void serviceInit(Configuration conf) throws Exception {
- Preconditions.checkArgument(conf instanceof TajoConf);
- TajoConf systemConf = (TajoConf) conf;
-
- // seconds
- int expireIntvl = systemConf.getIntVar(TajoConf.ConfVars.$CLIENT_SESSION_EXPIRY_TIME);
- setExpireInterval(expireIntvl);
- setMonitorInterval(expireIntvl / 3);
- super.serviceInit(conf);
- }
-
- @Override
- protected void expire(String id) {
- dispatcher.handle(new SessionEvent(id, SessionEventType.EXPIRE));
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java
deleted file mode 100644
index d701d03..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.session;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class SessionManager extends CompositeService implements EventHandler<SessionEvent> {
- private static final Log LOG = LogFactory.getLog(SessionManager.class);
-
- public final ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<String, Session>();
- private final Dispatcher dispatcher;
- private SessionLivelinessMonitor sessionLivelinessMonitor;
-
-
- public SessionManager(Dispatcher dispatcher) {
- super(SessionManager.class.getSimpleName());
- this.dispatcher = dispatcher;
- }
-
- @Override
- public void serviceInit(Configuration conf) throws Exception {
- sessionLivelinessMonitor = new SessionLivelinessMonitor(dispatcher);
- addIfService(sessionLivelinessMonitor);
- super.serviceInit(conf);
- }
-
- @Override
- public void serviceStop() throws Exception {
- super.serviceStop();
- }
-
- private void assertSessionExistence(String sessionId) throws InvalidSessionException {
- if (!sessions.containsKey(sessionId)) {
- throw new InvalidSessionException(sessionId);
- }
- }
-
- public String createSession(String username, String baseDatabaseName) throws InvalidSessionException {
- String sessionId;
- Session oldSession;
-
- sessionId = UUID.randomUUID().toString();
- Session newSession = new Session(sessionId, username, baseDatabaseName);
- oldSession = sessions.putIfAbsent(sessionId, newSession);
- if (oldSession != null) {
- throw new InvalidSessionException("Session id is duplicated: " + oldSession.getSessionId());
- }
- LOG.info("Session " + sessionId + " is created." );
- return sessionId;
- }
-
- public Session removeSession(String sessionId) {
- if (sessions.containsKey(sessionId)) {
- LOG.info("Session " + sessionId + " is removed.");
- Session session = sessions.remove(sessionId);
- session.close();
- return session;
- } else {
- LOG.error("No such session id: " + sessionId);
- return null;
- }
- }
-
- public Session getSession(String sessionId) throws InvalidSessionException {
- assertSessionExistence(sessionId);
- touch(sessionId);
- return sessions.get(sessionId);
- }
-
- public void setVariable(String sessionId, String name, String value) throws InvalidSessionException {
- assertSessionExistence(sessionId);
- touch(sessionId);
- sessions.get(sessionId).setVariable(name, value);
- }
-
- public String getVariable(String sessionId, String name)
- throws InvalidSessionException, NoSuchSessionVariableException {
- assertSessionExistence(sessionId);
- touch(sessionId);
- return sessions.get(sessionId).getVariable(name);
- }
-
- public void removeVariable(String sessionId, String name) throws InvalidSessionException {
- assertSessionExistence(sessionId);
- touch(sessionId);
- sessions.get(sessionId).removeVariable(name);
- }
-
- public Map<String, String> getAllVariables(String sessionId) throws InvalidSessionException {
- assertSessionExistence(sessionId);
- touch(sessionId);
- return sessions.get(sessionId).getAllVariables();
- }
-
- public void touch(String sessionId) throws InvalidSessionException {
- assertSessionExistence(sessionId);
- sessions.get(sessionId).updateLastAccessTime();
- sessionLivelinessMonitor.receivedPing(sessionId);
- }
-
- @Override
- public void handle(SessionEvent event) {
- LOG.info("Processing " + event.getSessionId() + " of type " + event.getType());
-
- try {
- assertSessionExistence(event.getSessionId());
- touch(event.getSessionId());
- } catch (InvalidSessionException e) {
- LOG.error(e);
- }
-
- if (event.getType() == SessionEventType.EXPIRE) {
- Session session = removeSession(event.getSessionId());
- if (session != null) {
- LOG.info("[Expired] Session username=" + session.getUserName() + ",sessionid=" + event.getSessionId());
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/metrics/CatalogMetricsGaugeSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/metrics/CatalogMetricsGaugeSet.java b/tajo-core/src/main/java/org/apache/tajo/metrics/CatalogMetricsGaugeSet.java
new file mode 100644
index 0000000..82ebe29
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/metrics/CatalogMetricsGaugeSet.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.metrics;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
+import org.apache.tajo.master.TajoMaster;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+
+public class CatalogMetricsGaugeSet implements MetricSet {
+ TajoMaster.MasterContext tajoMasterContext;
+ public CatalogMetricsGaugeSet(TajoMaster.MasterContext tajoMasterContext) {
+ this.tajoMasterContext = tajoMasterContext;
+ }
+
+ @Override
+ public Map<String, Metric> getMetrics() {
+ Map<String, Metric> metricsMap = new HashMap<String, Metric>();
+ metricsMap.put("numTables", new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return tajoMasterContext.getCatalog().getAllTableNames(DEFAULT_DATABASE_NAME).size();
+ }
+ });
+
+ metricsMap.put("numFunctions", new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return tajoMasterContext.getCatalog().getFunctions().size();
+ }
+ });
+
+ return metricsMap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/metrics/WorkerResourceMetricsGaugeSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/metrics/WorkerResourceMetricsGaugeSet.java b/tajo-core/src/main/java/org/apache/tajo/metrics/WorkerResourceMetricsGaugeSet.java
new file mode 100644
index 0000000..229a80a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/metrics/WorkerResourceMetricsGaugeSet.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.metrics;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.rm.Worker;
+import org.apache.tajo.master.rm.WorkerState;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class WorkerResourceMetricsGaugeSet implements MetricSet {
+ TajoMaster.MasterContext tajoMasterContext;
+ public WorkerResourceMetricsGaugeSet(TajoMaster.MasterContext tajoMasterContext) {
+ this.tajoMasterContext = tajoMasterContext;
+ }
+
+ @Override
+ public Map<String, Metric> getMetrics() {
+ Map<String, Metric> metricsMap = new HashMap<String, Metric>();
+ metricsMap.put("totalWorkers", new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return tajoMasterContext.getResourceManager().getWorkers().size();
+ }
+ });
+
+ metricsMap.put("liveWorkers", new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return getNumWorkers(WorkerState.RUNNING);
+ }
+ });
+
+ metricsMap.put("deadWorkers", new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return getNumWorkers(WorkerState.LOST);
+ }
+ });
+
+ return metricsMap;
+ }
+
+ protected int getNumWorkers(WorkerState status) {
+ int numWorkers = 0;
+ for(Worker eachWorker: tajoMasterContext.getResourceManager().getWorkers().values()) {
+ if(eachWorker.getState() == status) {
+ numWorkers++;
+ }
+ }
+
+ return numWorkers;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java
new file mode 100644
index 0000000..e45f274
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.querymaster;
+
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.master.event.TaskRequestEvent;
+import org.apache.tajo.master.event.TaskSchedulerEvent;
+
+
+public abstract class AbstractTaskScheduler extends AbstractService implements EventHandler<TaskSchedulerEvent> {
+
+ protected int hostLocalAssigned;
+ protected int rackLocalAssigned;
+ protected int totalAssigned;
+
+ /**
+ * Construct the service.
+ *
+ * @param name service name
+ */
+ public AbstractTaskScheduler(String name) {
+ super(name);
+ }
+
+ public int getHostLocalAssigned() {
+ return hostLocalAssigned;
+ }
+
+ public int getRackLocalAssigned() {
+ return rackLocalAssigned;
+ }
+
+ public int getTotalAssigned() {
+ return totalAssigned;
+ }
+
+ public abstract void handleTaskRequestEvent(TaskRequestEvent event);
+ public abstract int remainingScheduledObjectNum();
+}