You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 11:19:43 UTC
[21/51] [partial] TAJO-752: Escalate sub modules in tajo-core into
the top-level modules. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
new file mode 100644
index 0000000..15ac6b6
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -0,0 +1,537 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.protobuf.RpcCallback;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.querymaster.QueryInProgress;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.util.ApplicationIdUtils;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+import static org.apache.tajo.ipc.TajoMasterProtocol.*;
+
+
+/**
+ * It manages all resources of tajo workers.
+ */
+public class TajoWorkerResourceManager extends CompositeService implements WorkerResourceManager {
+ /** class logger */
+ private static final Log LOG = LogFactory.getLog(TajoWorkerResourceManager.class);
+
+ static AtomicInteger containerIdSeq = new AtomicInteger(0);
+
+ private TajoMaster.MasterContext masterContext;
+
+ private TajoRMContext rmContext;
+
+ private String queryIdSeed;
+
+ private WorkerResourceAllocationThread workerResourceAllocator;
+
+ /**
+ * Worker Liveliness monitor
+ */
+ private WorkerLivelinessMonitor workerLivelinessMonitor;
+
+ private BlockingQueue<WorkerResourceRequest> requestQueue;
+
+ private AtomicBoolean stopped = new AtomicBoolean(false);
+
+ private TajoConf systemConf;
+
+ private ConcurrentMap<ContainerIdProto, AllocatedWorkerResource> allocatedResourceMap = Maps.newConcurrentMap();
+
+ /** It receives status messages from workers and their resources. */
+ private TajoResourceTracker resourceTracker;
+
+ public TajoWorkerResourceManager(TajoMaster.MasterContext masterContext) {
+ super(TajoWorkerResourceManager.class.getSimpleName());
+ this.masterContext = masterContext;
+ }
+
+ public TajoWorkerResourceManager(TajoConf systemConf) {
+ super(TajoWorkerResourceManager.class.getSimpleName());
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ Preconditions.checkArgument(conf instanceof TajoConf);
+ this.systemConf = (TajoConf) conf;
+
+ AsyncDispatcher dispatcher = new AsyncDispatcher();
+ addIfService(dispatcher);
+
+ rmContext = new TajoRMContext(dispatcher);
+
+ this.queryIdSeed = String.valueOf(System.currentTimeMillis());
+
+ requestQueue = new LinkedBlockingDeque<WorkerResourceRequest>();
+
+ workerResourceAllocator = new WorkerResourceAllocationThread();
+ workerResourceAllocator.start();
+
+ this.workerLivelinessMonitor = new WorkerLivelinessMonitor(this.rmContext.getDispatcher());
+ addIfService(this.workerLivelinessMonitor);
+
+ // Register event handler for Workers
+ rmContext.getDispatcher().register(WorkerEventType.class, new WorkerEventDispatcher(rmContext));
+
+ resourceTracker = new TajoResourceTracker(rmContext, workerLivelinessMonitor);
+ addIfService(resourceTracker);
+
+ super.serviceInit(systemConf);
+ }
+
+ @InterfaceAudience.Private
+ public static final class WorkerEventDispatcher implements EventHandler<WorkerEvent> {
+
+ private final TajoRMContext rmContext;
+
+ public WorkerEventDispatcher(TajoRMContext rmContext) {
+ this.rmContext = rmContext;
+ }
+
+ @Override
+ public void handle(WorkerEvent event) {
+ String workerId = event.getWorkerId();
+ Worker node = this.rmContext.getWorkers().get(workerId);
+ if (node != null) {
+ try {
+ node.handle(event);
+ } catch (Throwable t) {
+ LOG.error("Error in handling event type " + event.getType() + " for node " + workerId, t);
+ }
+ }
+ }
+ }
+
+ @Override
+ public Map<String, Worker> getWorkers() {
+ return ImmutableMap.copyOf(rmContext.getWorkers());
+ }
+
+ @Override
+ public Map<String, Worker> getInactiveWorkers() {
+ return ImmutableMap.copyOf(rmContext.getInactiveWorkers());
+ }
+
+ public Collection<String> getQueryMasters() {
+ return Collections.unmodifiableSet(rmContext.getQueryMasterWorker());
+ }
+
+ @Override
+ public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() {
+ return resourceTracker.getClusterResourceSummary();
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ if(stopped.get()) {
+ return;
+ }
+ stopped.set(true);
+ if(workerResourceAllocator != null) {
+ workerResourceAllocator.interrupt();
+ }
+
+ super.serviceStop();
+ }
+
+ /**
+ *
+ * @return The prefix of queryId. It is generated when a TajoMaster starts up.
+ */
+ @Override
+ public String getSeedQueryId() throws IOException {
+ return queryIdSeed;
+ }
+
+ @VisibleForTesting
+ TajoResourceTracker getResourceTracker() {
+ return resourceTracker;
+ }
+
+ private WorkerResourceAllocationRequest createQMResourceRequest(QueryId queryId) {
+ float queryMasterDefaultDiskSlot = masterContext.getConf().getFloatVar(
+ TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT);
+ int queryMasterDefaultMemoryMB = masterContext.getConf().getIntVar(TajoConf.ConfVars.TAJO_QUERYMASTER_MEMORY_MB);
+
+ WorkerResourceAllocationRequest.Builder builder = WorkerResourceAllocationRequest.newBuilder();
+ builder.setQueryId(queryId.getProto());
+ builder.setMaxMemoryMBPerContainer(queryMasterDefaultMemoryMB);
+ builder.setMinMemoryMBPerContainer(queryMasterDefaultMemoryMB);
+ builder.setMaxDiskSlotPerContainer(queryMasterDefaultDiskSlot);
+ builder.setMinDiskSlotPerContainer(queryMasterDefaultDiskSlot);
+ builder.setResourceRequestPriority(TajoMasterProtocol.ResourceRequestPriority.MEMORY);
+ builder.setNumContainers(1);
+ return builder.build();
+ }
+
+ @Override
+ public WorkerAllocatedResource allocateQueryMaster(QueryInProgress queryInProgress) {
+ // Create a resource request for a query master
+ WorkerResourceAllocationRequest qmResourceRequest = createQMResourceRequest(queryInProgress.getQueryId());
+
+ // call future for async call
+ CallFuture<WorkerResourceAllocationResponse> callFuture = new CallFuture<WorkerResourceAllocationResponse>();
+ allocateWorkerResources(qmResourceRequest, callFuture);
+
+ // Wait for 3 seconds
+ WorkerResourceAllocationResponse response = null;
+ try {
+ response = callFuture.get(3, TimeUnit.SECONDS);
+ } catch (Throwable t) {
+ LOG.error(t);
+ return null;
+ }
+
+ if (response.getWorkerAllocatedResourceList().size() == 0) {
+ return null;
+ }
+
+ WorkerAllocatedResource resource = response.getWorkerAllocatedResource(0);
+ registerQueryMaster(queryInProgress.getQueryId(), resource.getContainerId());
+ return resource;
+ }
+
+ private void registerQueryMaster(QueryId queryId, ContainerIdProto containerId) {
+ rmContext.getQueryMasterContainer().putIfAbsent(queryId, containerId);
+ }
+
+ @Override
+ public void allocateWorkerResources(WorkerResourceAllocationRequest request,
+ RpcCallback<WorkerResourceAllocationResponse> callBack) {
+ try {
+ //TODO checking queue size
+ requestQueue.put(new WorkerResourceRequest(new QueryId(request.getQueryId()), false, request, callBack));
+ } catch (InterruptedException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ static class WorkerResourceRequest {
+ boolean queryMasterRequest;
+ QueryId queryId;
+ WorkerResourceAllocationRequest request;
+ RpcCallback<WorkerResourceAllocationResponse> callBack;
+ WorkerResourceRequest(
+ QueryId queryId,
+ boolean queryMasterRequest, WorkerResourceAllocationRequest request,
+ RpcCallback<WorkerResourceAllocationResponse> callBack) {
+ this.queryId = queryId;
+ this.queryMasterRequest = queryMasterRequest;
+ this.request = request;
+ this.callBack = callBack;
+ }
+ }
+
+ static class AllocatedWorkerResource {
+ Worker worker;
+ int allocatedMemoryMB;
+ float allocatedDiskSlots;
+ }
+
+ class WorkerResourceAllocationThread extends Thread {
+ @Override
+ public void run() {
+ LOG.info("WorkerResourceAllocationThread start");
+ while(!stopped.get()) {
+ try {
+ WorkerResourceRequest resourceRequest = requestQueue.take();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("allocateWorkerResources:" +
+ (new QueryId(resourceRequest.request.getQueryId())) +
+ ", requiredMemory:" + resourceRequest.request.getMinMemoryMBPerContainer() +
+ "~" + resourceRequest.request.getMaxMemoryMBPerContainer() +
+ ", requiredContainers:" + resourceRequest.request.getNumContainers() +
+ ", requiredDiskSlots:" + resourceRequest.request.getMinDiskSlotPerContainer() +
+ "~" + resourceRequest.request.getMaxDiskSlotPerContainer() +
+ ", queryMasterRequest=" + resourceRequest.queryMasterRequest +
+ ", liveWorkers=" + rmContext.getWorkers().size());
+ }
+
+ List<AllocatedWorkerResource> allocatedWorkerResources = chooseWorkers(resourceRequest);
+
+ if(allocatedWorkerResources.size() > 0) {
+ List<WorkerAllocatedResource> allocatedResources =
+ new ArrayList<WorkerAllocatedResource>();
+
+ for(AllocatedWorkerResource allocatedResource: allocatedWorkerResources) {
+ NodeId nodeId = NodeId.newInstance(allocatedResource.worker.getHostName(),
+ allocatedResource.worker.getPeerRpcPort());
+
+ TajoWorkerContainerId containerId = new TajoWorkerContainerId();
+
+ containerId.setApplicationAttemptId(
+ ApplicationIdUtils.createApplicationAttemptId(resourceRequest.queryId));
+ containerId.setId(containerIdSeq.incrementAndGet());
+
+ ContainerIdProto containerIdProto = containerId.getProto();
+ allocatedResources.add(WorkerAllocatedResource.newBuilder()
+ .setContainerId(containerIdProto)
+ .setNodeId(nodeId.toString())
+ .setWorkerHost(allocatedResource.worker.getHostName())
+ .setQueryMasterPort(allocatedResource.worker.getQueryMasterPort())
+ .setClientPort(allocatedResource.worker.getClientPort())
+ .setPeerRpcPort(allocatedResource.worker.getPeerRpcPort())
+ .setWorkerPullServerPort(allocatedResource.worker.getPullServerPort())
+ .setAllocatedMemoryMB(allocatedResource.allocatedMemoryMB)
+ .setAllocatedDiskSlots(allocatedResource.allocatedDiskSlots)
+ .build());
+
+
+ allocatedResourceMap.putIfAbsent(containerIdProto, allocatedResource);
+ }
+
+ resourceRequest.callBack.run(WorkerResourceAllocationResponse.newBuilder()
+ .setQueryId(resourceRequest.request.getQueryId())
+ .addAllWorkerAllocatedResource(allocatedResources)
+ .build()
+ );
+
+ } else {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("=========================================");
+ LOG.debug("Available Workers");
+ for(String liveWorker: rmContext.getWorkers().keySet()) {
+ LOG.debug(rmContext.getWorkers().get(liveWorker).toString());
+ }
+ LOG.debug("=========================================");
+ }
+ requestQueue.put(resourceRequest);
+ Thread.sleep(100);
+ }
+ } catch(InterruptedException ie) {
+ LOG.error(ie);
+ }
+ }
+ }
+ }
+
+ private List<AllocatedWorkerResource> chooseWorkers(WorkerResourceRequest resourceRequest) {
+ List<AllocatedWorkerResource> selectedWorkers = new ArrayList<AllocatedWorkerResource>();
+
+ int allocatedResources = 0;
+
+ TajoMasterProtocol.ResourceRequestPriority resourceRequestPriority
+ = resourceRequest.request.getResourceRequestPriority();
+
+ if(resourceRequestPriority == TajoMasterProtocol.ResourceRequestPriority.MEMORY) {
+ synchronized(rmContext) {
+ List<String> randomWorkers = new ArrayList<String>(rmContext.getWorkers().keySet());
+ Collections.shuffle(randomWorkers);
+
+ int numContainers = resourceRequest.request.getNumContainers();
+ int minMemoryMB = resourceRequest.request.getMinMemoryMBPerContainer();
+ int maxMemoryMB = resourceRequest.request.getMaxMemoryMBPerContainer();
+ float diskSlot = Math.max(resourceRequest.request.getMaxDiskSlotPerContainer(),
+ resourceRequest.request.getMinDiskSlotPerContainer());
+
+ int liveWorkerSize = randomWorkers.size();
+ Set<String> insufficientWorkers = new HashSet<String>();
+ boolean stop = false;
+ boolean checkMax = true;
+ while(!stop) {
+ if(allocatedResources >= numContainers) {
+ break;
+ }
+
+ if(insufficientWorkers.size() >= liveWorkerSize) {
+ if(!checkMax) {
+ break;
+ }
+ insufficientWorkers.clear();
+ checkMax = false;
+ }
+ int compareAvailableMemory = checkMax ? maxMemoryMB : minMemoryMB;
+
+ for(String eachWorker: randomWorkers) {
+ if(allocatedResources >= numContainers) {
+ stop = true;
+ break;
+ }
+
+ if(insufficientWorkers.size() >= liveWorkerSize) {
+ break;
+ }
+
+ Worker worker = rmContext.getWorkers().get(eachWorker);
+ WorkerResource workerResource = worker.getResource();
+ if(workerResource.getAvailableMemoryMB() >= compareAvailableMemory) {
+ int workerMemory;
+ if(workerResource.getAvailableMemoryMB() >= maxMemoryMB) {
+ workerMemory = maxMemoryMB;
+ } else {
+ workerMemory = workerResource.getAvailableMemoryMB();
+ }
+ AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource();
+ allocatedWorkerResource.worker = worker;
+ allocatedWorkerResource.allocatedMemoryMB = workerMemory;
+ if(workerResource.getAvailableDiskSlots() >= diskSlot) {
+ allocatedWorkerResource.allocatedDiskSlots = diskSlot;
+ } else {
+ allocatedWorkerResource.allocatedDiskSlots = workerResource.getAvailableDiskSlots();
+ }
+
+ workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots,
+ allocatedWorkerResource.allocatedMemoryMB);
+
+ selectedWorkers.add(allocatedWorkerResource);
+
+ allocatedResources++;
+ } else {
+ insufficientWorkers.add(eachWorker);
+ }
+ }
+ }
+ }
+ } else {
+ synchronized(rmContext) {
+ List<String> randomWorkers = new ArrayList<String>(rmContext.getWorkers().keySet());
+ Collections.shuffle(randomWorkers);
+
+ int numContainers = resourceRequest.request.getNumContainers();
+ float minDiskSlots = resourceRequest.request.getMinDiskSlotPerContainer();
+ float maxDiskSlots = resourceRequest.request.getMaxDiskSlotPerContainer();
+ int memoryMB = Math.max(resourceRequest.request.getMaxMemoryMBPerContainer(),
+ resourceRequest.request.getMinMemoryMBPerContainer());
+
+ int liveWorkerSize = randomWorkers.size();
+ Set<String> insufficientWorkers = new HashSet<String>();
+ boolean stop = false;
+ boolean checkMax = true;
+ while(!stop) {
+ if(allocatedResources >= numContainers) {
+ break;
+ }
+
+ if(insufficientWorkers.size() >= liveWorkerSize) {
+ if(!checkMax) {
+ break;
+ }
+ insufficientWorkers.clear();
+ checkMax = false;
+ }
+ float compareAvailableDisk = checkMax ? maxDiskSlots : minDiskSlots;
+
+ for(String eachWorker: randomWorkers) {
+ if(allocatedResources >= numContainers) {
+ stop = true;
+ break;
+ }
+
+ if(insufficientWorkers.size() >= liveWorkerSize) {
+ break;
+ }
+
+ Worker worker = rmContext.getWorkers().get(eachWorker);
+ WorkerResource workerResource = worker.getResource();
+ if(workerResource.getAvailableDiskSlots() >= compareAvailableDisk) {
+ float workerDiskSlots;
+ if(workerResource.getAvailableDiskSlots() >= maxDiskSlots) {
+ workerDiskSlots = maxDiskSlots;
+ } else {
+ workerDiskSlots = workerResource.getAvailableDiskSlots();
+ }
+ AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource();
+ allocatedWorkerResource.worker = worker;
+ allocatedWorkerResource.allocatedDiskSlots = workerDiskSlots;
+
+ if(workerResource.getAvailableMemoryMB() >= memoryMB) {
+ allocatedWorkerResource.allocatedMemoryMB = memoryMB;
+ } else {
+ allocatedWorkerResource.allocatedMemoryMB = workerResource.getAvailableMemoryMB();
+ }
+ workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots,
+ allocatedWorkerResource.allocatedMemoryMB);
+
+ selectedWorkers.add(allocatedWorkerResource);
+
+ allocatedResources++;
+ } else {
+ insufficientWorkers.add(eachWorker);
+ }
+ }
+ }
+ }
+ }
+ return selectedWorkers;
+ }
+
+ /**
+ * Release allocated resource.
+ *
+ * @param containerId ContainerIdProto to be released
+ */
+ @Override
+ public void releaseWorkerResource(ContainerIdProto containerId) {
+ AllocatedWorkerResource allocated = allocatedResourceMap.get(containerId);
+ if(allocated != null) {
+ LOG.info("Release Resource: " + allocated.allocatedDiskSlots + "," + allocated.allocatedMemoryMB);
+ allocated.worker.getResource().releaseResource( allocated.allocatedDiskSlots, allocated.allocatedMemoryMB);
+ } else {
+ LOG.warn("No AllocatedWorkerResource data for [" + containerId + "]");
+ return;
+ }
+ }
+
+ @Override
+ public boolean isQueryMasterStopped(QueryId queryId) {
+ return !rmContext.getQueryMasterContainer().containsKey(queryId);
+ }
+
+ @Override
+ public void stopQueryMaster(QueryId queryId) {
+ WorkerResource resource = null;
+ if(!rmContext.getQueryMasterContainer().containsKey(queryId)) {
+ LOG.warn("No QueryMaster resource info for " + queryId);
+ return;
+ } else {
+ ContainerIdProto containerId = rmContext.getQueryMasterContainer().remove(queryId);
+ releaseWorkerResource(containerId);
+ LOG.info(String.format("Released QueryMaster (%s) resource:" + resource, queryId.toString()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
new file mode 100644
index 0000000..0d6b5ee
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+
+import java.util.EnumSet;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * It contains resource and various information for a worker.
+ */
+public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker> {
+ /** class logger */
+ private static final Log LOG = LogFactory.getLog(Worker.class);
+
+ private final ReentrantReadWriteLock.ReadLock readLock;
+ private final ReentrantReadWriteLock.WriteLock writeLock;
+
+ /** context of {@link org.apache.tajo.master.rm.TajoWorkerResourceManager} */
+ private final TajoRMContext rmContext;
+
+ /** Hostname */
+ private String hostName;
+ /** QueryMaster rpc port */
+ private int qmRpcPort;
+ /** Peer rpc port */
+ private int peerRpcPort;
+ /** http info port */
+ private int httpInfoPort;
+ /** the port of QueryMaster client rpc which provides an client API */
+ private int qmClientPort;
+ /** pull server port */
+ private int pullServerPort;
+ /** last heartbeat time */
+ private long lastHeartbeatTime;
+
+ /** Resource capability */
+ private WorkerResource resource;
+
+ private static final ReconnectNodeTransition RECONNECT_NODE_TRANSITION = new ReconnectNodeTransition();
+ private static final StatusUpdateTransition STATUS_UPDATE_TRANSITION = new StatusUpdateTransition();
+
+ private static final StateMachineFactory<Worker,
+ WorkerState,
+ WorkerEventType,
+ WorkerEvent> stateMachineFactory
+ = new StateMachineFactory<Worker,
+ WorkerState,
+ WorkerEventType,
+ WorkerEvent>(WorkerState.NEW)
+
+ // Transition from NEW
+ .addTransition(WorkerState.NEW, WorkerState.RUNNING,
+ WorkerEventType.STARTED,
+ new AddNodeTransition())
+
+ // Transition from RUNNING
+ .addTransition(WorkerState.RUNNING, EnumSet.of(WorkerState.RUNNING, WorkerState.UNHEALTHY),
+ WorkerEventType.STATE_UPDATE,
+ STATUS_UPDATE_TRANSITION)
+ .addTransition(WorkerState.RUNNING, WorkerState.LOST,
+ WorkerEventType.EXPIRE,
+ new DeactivateNodeTransition(WorkerState.LOST))
+ .addTransition(WorkerState.RUNNING, WorkerState.RUNNING,
+ WorkerEventType.RECONNECTED,
+ RECONNECT_NODE_TRANSITION)
+
+ // Transitions from UNHEALTHY state
+ .addTransition(WorkerState.UNHEALTHY, EnumSet.of(WorkerState.RUNNING, WorkerState.UNHEALTHY),
+ WorkerEventType.STATE_UPDATE,
+ STATUS_UPDATE_TRANSITION)
+ .addTransition(WorkerState.UNHEALTHY, WorkerState.LOST,
+ WorkerEventType.EXPIRE,
+ new DeactivateNodeTransition(WorkerState.LOST))
+ .addTransition(WorkerState.UNHEALTHY, WorkerState.UNHEALTHY,
+ WorkerEventType.RECONNECTED,
+ RECONNECT_NODE_TRANSITION);
+
+ private final StateMachine<WorkerState, WorkerEventType, WorkerEvent> stateMachine =
+ stateMachineFactory.make(this, WorkerState.NEW);
+
+ public Worker(TajoRMContext rmContext, WorkerResource resource) {
+ this.rmContext = rmContext;
+
+ this.lastHeartbeatTime = System.currentTimeMillis();
+ this.resource = resource;
+
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ this.readLock = lock.readLock();
+ this.writeLock = lock.writeLock();
+ }
+
+ public String getWorkerId() {
+ return hostName + ":" + qmRpcPort + ":" + peerRpcPort;
+ }
+
+ public String getHostName() {
+ return hostName;
+ }
+
+ public void setHostName(String allocatedHost) {
+ this.hostName = allocatedHost;
+ }
+
+ public int getPeerRpcPort() {
+ return peerRpcPort;
+ }
+
+ public void setPeerRpcPort(int peerRpcPort) {
+ this.peerRpcPort = peerRpcPort;
+ }
+
+ public int getQueryMasterPort() {
+ return qmRpcPort;
+ }
+
+ public void setQueryMasterPort(int queryMasterPort) {
+ this.qmRpcPort = queryMasterPort;
+ }
+
+ public int getClientPort() {
+ return qmClientPort;
+ }
+
+ public void setClientPort(int clientPort) {
+ this.qmClientPort = clientPort;
+ }
+
+ public int getPullServerPort() {
+ return pullServerPort;
+ }
+
+ public void setPullServerPort(int pullServerPort) {
+ this.pullServerPort = pullServerPort;
+ }
+
+ public int getHttpPort() {
+ return httpInfoPort;
+ }
+
+ public void setHttpPort(int port) {
+ this.httpInfoPort = port;
+ }
+
+ public void setLastHeartbeatTime(long lastheartbeatReportTime) {
+ this.writeLock.lock();
+
+ try {
+ this.lastHeartbeatTime = lastheartbeatReportTime;
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ public long getLastHeartbeatTime() {
+ this.readLock.lock();
+
+ try {
+ return this.lastHeartbeatTime;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ /**
+ *
+ * @return the current state of worker
+ */
+ public WorkerState getState() {
+ this.readLock.lock();
+
+ try {
+ return this.stateMachine.getCurrentState();
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ /**
+ *
+ * @return the current resource capability of worker
+ */
+ public WorkerResource getResource() {
+ return this.resource;
+ }
+
+ @Override
+ public int compareTo(Worker o) {
+ if(o == null) {
+ return 1;
+ }
+ return getWorkerId().compareTo(o.getWorkerId());
+ }
+
+ public static class AddNodeTransition implements SingleArcTransition<Worker, WorkerEvent> {
+ @Override
+ public void transition(Worker worker, WorkerEvent workerEvent) {
+
+ if(worker.getResource().isQueryMasterMode()) {
+ worker.rmContext.getQueryMasterWorker().add(worker.getWorkerId());
+ }
+ LOG.info("Worker with " + worker.getResource() + " is joined to Tajo cluster");
+ }
+ }
+
+ public static class StatusUpdateTransition implements
+ MultipleArcTransition<Worker, WorkerEvent, WorkerState> {
+
+ @Override
+ public WorkerState transition(Worker worker, WorkerEvent event) {
+ WorkerStatusEvent statusEvent = (WorkerStatusEvent) event;
+
+ // TODO - the synchronization scope using rmContext is too coarsen.
+ synchronized (worker.rmContext) {
+ worker.setLastHeartbeatTime(System.currentTimeMillis());
+ worker.getResource().setNumRunningTasks(statusEvent.getRunningTaskNum());
+ worker.getResource().setMaxHeap(statusEvent.maxHeap());
+ worker.getResource().setFreeHeap(statusEvent.getFreeHeap());
+ worker.getResource().setTotalHeap(statusEvent.getTotalHeap());
+ }
+
+ return WorkerState.RUNNING;
+ }
+ }
+
+ public static class DeactivateNodeTransition implements SingleArcTransition<Worker, WorkerEvent> {
+ private final WorkerState finalState;
+
+ public DeactivateNodeTransition(WorkerState finalState) {
+ this.finalState = finalState;
+ }
+
+ @Override
+ public void transition(Worker worker, WorkerEvent workerEvent) {
+
+ worker.rmContext.getWorkers().remove(worker.getWorkerId());
+ LOG.info("Deactivating Node " + worker.getWorkerId() + " as it is now " + finalState);
+ worker.rmContext.getInactiveWorkers().putIfAbsent(worker.getWorkerId(), worker);
+ }
+ }
+
+ public static class ReconnectNodeTransition implements SingleArcTransition<Worker, WorkerEvent> {
+
+ @Override
+ public void transition(Worker worker, WorkerEvent workerEvent) {
+ WorkerReconnectEvent castedEvent = (WorkerReconnectEvent) workerEvent;
+
+ Worker newWorker = castedEvent.getWorker();
+ worker.rmContext.getWorkers().put(castedEvent.getWorkerId(), newWorker);
+ worker.rmContext.getDispatcher().getEventHandler().handle(
+ new WorkerEvent(worker.getWorkerId(), WorkerEventType.STARTED));
+ }
+ }
+
+ @Override
+ public void handle(WorkerEvent event) {
+ LOG.debug("Processing " + event.getWorkerId() + " of type " + event.getType());
+ try {
+ writeLock.lock();
+ WorkerState oldState = getState();
+ try {
+ stateMachine.doTransition(event.getType(), event);
+ } catch (InvalidStateTransitonException e) {
+ LOG.error("Can't handle this event at current state", e);
+ LOG.error("Invalid event " + event.getType() + " on Worker " + getWorkerId());
+ }
+ if (oldState != getState()) {
+ LOG.info(getWorkerId() + " Node Transitioned from " + oldState + " to " + getState());
+ }
+ }
+
+ finally {
+ writeLock.unlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java
new file mode 100644
index 0000000..389c3be
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+/**
+ * WorkerEvent describes all kinds of events which sent to {@link Worker}.
+ */
+public class WorkerEvent extends AbstractEvent<WorkerEventType> {
+ private final String workerId;
+
+ public WorkerEvent(String workerId, WorkerEventType workerEventType) {
+ super(workerEventType);
+ this.workerId = workerId;
+ }
+
+ public String getWorkerId() {
+ return workerId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEventType.java
new file mode 100644
index 0000000..0c97654
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEventType.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+public enum WorkerEventType {
+
+ /** Source : {@link TajoResourceTracker}, Destination: {@link Worker} */
+ STARTED,
+ STATE_UPDATE,
+ RECONNECTED,
+
+ /** Source : {@link WorkerLivelinessMonitor}, Destination: {@link Worker} */
+ EXPIRE
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java
new file mode 100644
index 0000000..e3524d6
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tajo.conf.TajoConf;
+
+/**
+ * It periodically checks the latest heartbeat time of {@link Worker}.
+ * If the latest heartbeat time is expired, it produces EXPIRE event to a corresponding {@link Worker}.
+ */
+public class WorkerLivelinessMonitor extends AbstractLivelinessMonitor<String> {
+
+ private EventHandler dispatcher;
+
+ public WorkerLivelinessMonitor(Dispatcher d) {
+ super(WorkerLivelinessMonitor.class.getSimpleName(), new SystemClock());
+ this.dispatcher = d.getEventHandler();
+ }
+
+ public void serviceInit(Configuration conf) throws Exception {
+ Preconditions.checkArgument(conf instanceof TajoConf);
+ TajoConf systemConf = (TajoConf) conf;
+ // milliseconds
+ int expireIntvl = systemConf.getIntVar(TajoConf.ConfVars.RESOURCE_TRACKER_HEARTBEAT_TIMEOUT);
+ setExpireInterval(expireIntvl);
+ setMonitorInterval(expireIntvl/3);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void expire(String id) {
+ dispatcher.handle(new WorkerEvent(id, WorkerEventType.EXPIRE));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java
new file mode 100644
index 0000000..46f286d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+/**
+ * {@link TajoResourceTracker} produces this event, and it's destination is {@link Worker}.
+ * This event occurs only when an inactive worker sends a ping again.
+ */
+public class WorkerReconnectEvent extends WorkerEvent {
+ private final Worker worker;
+ public WorkerReconnectEvent(String workerId, Worker worker) {
+ super(workerId, WorkerEventType.RECONNECTED);
+ this.worker = worker;
+ }
+
+ public Worker getWorker() {
+ return worker;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
new file mode 100644
index 0000000..bfe186c
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Describe current resources of a worker.
+ *
+ * It includes various resource capability of a worker as follows:
+ * <ul>
+ * <li>used/total disk slots</li>
+ * <li>used/total core slots</li>
+ * <li>used/total memory</li>
+ * <li>the number of running tasks</li>
+ * </ul>
+ */
+public class WorkerResource {
+ private static final Log LOG = LogFactory.getLog(WorkerResource.class);
+
+ private float diskSlots;
+ private int cpuCoreSlots;
+ private int memoryMB;
+
+ private float usedDiskSlots;
+ private int usedMemoryMB;
+ private int usedCpuCoreSlots;
+
+ private long maxHeap;
+ private long freeHeap;
+ private long totalHeap;
+
+ private int numRunningTasks;
+
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final Lock rlock = lock.readLock();
+ private final Lock wlock = lock.writeLock();
+
+ private boolean queryMasterMode;
+
+ private boolean taskRunnerMode;
+
+ private AtomicInteger numQueryMasterTasks = new AtomicInteger(0);
+
+ public float getDiskSlots() {
+ return diskSlots;
+ }
+
+ public void setDiskSlots(float diskSlots) {
+ this.diskSlots = diskSlots;
+ }
+
+ public int getCpuCoreSlots() {
+ return cpuCoreSlots;
+ }
+
+ public void setCpuCoreSlots(int cpuCoreSlots) {
+ this.cpuCoreSlots = cpuCoreSlots;
+ }
+
+ public int getMemoryMB() {
+ try {
+ rlock.lock();
+ return memoryMB;
+ } finally {
+ rlock.unlock();
+ }
+ }
+
+ public void setMemoryMB(int memoryMB) {
+ try {
+ wlock.lock();
+ this.memoryMB = memoryMB;
+ } finally {
+ wlock.unlock();
+ }
+ }
+
+ public float getAvailableDiskSlots() {
+ return diskSlots - usedDiskSlots;
+ }
+
+ public int getAvailableMemoryMB() {
+ return memoryMB - usedMemoryMB;
+ }
+
+ public int getAvailableCpuCoreSlots() {
+ return cpuCoreSlots - usedCpuCoreSlots;
+ }
+
+ @Override
+ public String toString() {
+ return "slots=m:" + memoryMB + ",d:" + diskSlots +
+ ",c:" + cpuCoreSlots + ", used=m:" + usedMemoryMB + ",d:" + usedDiskSlots + ",c:" + usedCpuCoreSlots;
+ }
+
+ public int getUsedMemoryMB() {
+ try {
+ rlock.lock();
+ return usedMemoryMB;
+ } finally {
+ rlock.unlock();
+ }
+ }
+
+ public void setUsedMemoryMB(int usedMemoryMB) {
+ try {
+ wlock.lock();
+ this.usedMemoryMB = usedMemoryMB;
+ } finally {
+ wlock.unlock();
+ }
+ }
+
+ public int getUsedCpuCoreSlots() {
+ return usedCpuCoreSlots;
+ }
+
+ public void setUsedCpuCoreSlots(int usedCpuCoreSlots) {
+ this.usedCpuCoreSlots = usedCpuCoreSlots;
+ }
+
+ public float getUsedDiskSlots() {
+ return usedDiskSlots;
+ }
+
+ public boolean isQueryMasterMode() {
+ return queryMasterMode;
+ }
+
+ public void setQueryMasterMode(boolean queryMasterMode) {
+ this.queryMasterMode = queryMasterMode;
+ }
+
+ public boolean isTaskRunnerMode() {
+ return taskRunnerMode;
+ }
+
+ public void setTaskRunnerMode(boolean taskRunnerMode) {
+ this.taskRunnerMode = taskRunnerMode;
+ }
+
+ public void releaseResource(float diskSlots, int memoryMB) {
+ try {
+ wlock.lock();
+ usedMemoryMB = usedMemoryMB - memoryMB;
+ usedDiskSlots -= diskSlots;
+ if(usedMemoryMB < 0) {
+ LOG.warn("Used memory can't be a minus: " + usedMemoryMB);
+ usedMemoryMB = 0;
+ }
+ if(usedDiskSlots < 0) {
+ LOG.warn("Used disk slot can't be a minus: " + usedDiskSlots);
+ usedDiskSlots = 0;
+ }
+ } finally {
+ wlock.unlock();
+ }
+ }
+
+ public void allocateResource(float diskSlots, int memoryMB) {
+ try {
+ wlock.lock();
+ usedMemoryMB += memoryMB;
+ usedDiskSlots += diskSlots;
+
+ if(usedMemoryMB > this.memoryMB) {
+ usedMemoryMB = this.memoryMB;
+ }
+
+ if(usedDiskSlots > this.diskSlots) {
+ usedDiskSlots = this.diskSlots;
+ }
+ } finally {
+ wlock.unlock();
+ }
+ }
+
+ public long getMaxHeap() {
+ return maxHeap;
+ }
+
+ public void setMaxHeap(long maxHeap) {
+ this.maxHeap = maxHeap;
+ }
+
+ public long getFreeHeap() {
+ return freeHeap;
+ }
+
+ public void setFreeHeap(long freeHeap) {
+ this.freeHeap = freeHeap;
+ }
+
+ public long getTotalHeap() {
+ return totalHeap;
+ }
+
+ public void setTotalHeap(long totalHeap) {
+ this.totalHeap = totalHeap;
+ }
+
+ public int getNumRunningTasks() {
+ return numRunningTasks;
+ }
+
+ public void setNumRunningTasks(int numRunningTasks) {
+ this.numRunningTasks = numRunningTasks;
+ }
+
+ public int getNumQueryMasterTasks() {
+ return numQueryMasterTasks.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
new file mode 100644
index 0000000..54fe11c
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import com.google.protobuf.RpcCallback;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.querymaster.QueryInProgress;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource;
+import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerResourceAllocationResponse;
+
+/**
+ * An interface of WorkerResourceManager which allows TajoMaster to request allocation for containers
+ * and release the allocated containers.
+ */
+public interface WorkerResourceManager extends Service {
+
+ /**
+ * Request a resource container for a QueryMaster.
+ *
+ * @param queryInProgress QueryInProgress
+ * @return A allocated container resource
+ */
+ @Deprecated
+ public WorkerAllocatedResource allocateQueryMaster(QueryInProgress queryInProgress);
+
+ /**
+ * Request one or more resource containers. You can set the number of containers and resource capabilities, such as
+ * memory, CPU cores, and disk slots. This is an asynchronous call. You should use a callback to get allocated
+ * resource containers. Each container is identified {@link org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto}.
+ *
+ * @param request Request description
+ * @param rpcCallBack Callback function
+ */
+ public void allocateWorkerResources(TajoMasterProtocol.WorkerResourceAllocationRequest request,
+ RpcCallback<WorkerResourceAllocationResponse> rpcCallBack);
+
+ /**
+ * Release a container
+ *
+ * @param containerId ContainerIdProto to be released
+ */
+ public void releaseWorkerResource(ContainerIdProto containerId);
+
+ public String getSeedQueryId() throws IOException;
+
+ /**
+ * Check if a query master is stopped.
+ *
+ * @param queryId QueryId to be checked
+ * @return True if QueryMaster is stopped
+ */
+ public boolean isQueryMasterStopped(QueryId queryId);
+
+ /**
+ * Stop a query master
+ *
+ * @param queryId QueryId to be stopped
+ */
+ public void stopQueryMaster(QueryId queryId);
+
+ /**
+ *
+ * @return a Map instance containing active workers
+ */
+ public Map<String, Worker> getWorkers();
+
+ /**
+ *
+ * @return a Map instance containing inactive workers
+ */
+ public Map<String, Worker> getInactiveWorkers();
+
+ public void stop();
+
+ /**
+ *
+ * @return The overall summary of cluster resources
+ */
+ public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary();
+
+ /**
+ *
+ * @return WorkerIds on which QueryMasters are running
+ */
+ Collection<String> getQueryMasters();
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerState.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerState.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerState.java
new file mode 100644
index 0000000..a941008
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerState.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+/**
+ * It presents the states of {@link Worker}.
+ */
+public enum WorkerState {
+ /** New worker */
+ NEW,
+
+ /** Running worker */
+ RUNNING,
+
+ /** Worker is unhealthy */
+ UNHEALTHY,
+
+ /** worker is out of service */
+ DECOMMISSIONED,
+
+ /** worker has not sent a heartbeat for some configured time threshold */
+ LOST;
+
+ @SuppressWarnings("unused")
+ public boolean isUnusable() {
+ return (this == UNHEALTHY || this == DECOMMISSIONED || this == LOST);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java
new file mode 100644
index 0000000..8c3d7c1
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+/**
+ * {@link TajoResourceTracker} produces this event, and its destination is
+ * {@link org.apache.tajo.master.rm.Worker.StatusUpdateTransition} of {@link Worker}.
+ */
+public class WorkerStatusEvent extends WorkerEvent {
+ private final int runningTaskNum;
+ private final long maxHeap;
+ private final long freeHeap;
+ private final long totalHeap;
+
+ public WorkerStatusEvent(String workerId, int runningTaskNum, long maxHeap, long freeHeap, long totalHeap) {
+ super(workerId, WorkerEventType.STATE_UPDATE);
+ this.runningTaskNum = runningTaskNum;
+ this.maxHeap = maxHeap;
+ this.freeHeap = freeHeap;
+ this.totalHeap = totalHeap;
+ }
+
+ public int getRunningTaskNum() {
+ return runningTaskNum;
+ }
+
+ public long maxHeap() {
+ return maxHeap;
+ }
+
+ public long getFreeHeap() {
+ return freeHeap;
+ }
+
+ public long getTotalHeap() {
+ return totalHeap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
new file mode 100644
index 0000000..b9e132b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.master.event.ContainerAllocationEvent;
+import org.apache.tajo.master.event.ContainerAllocatorEventType;
+import org.apache.tajo.master.event.SubQueryContainerAllocationEvent;
+import org.apache.tajo.master.querymaster.Query;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.master.querymaster.SubQuery;
+import org.apache.tajo.master.querymaster.SubQueryState;
+import org.apache.tajo.util.ApplicationIdUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class YarnRMContainerAllocator extends AMRMClientImpl
+ implements EventHandler<ContainerAllocationEvent> {
+
+ /** Class Logger */
+ private static final Log LOG = LogFactory.getLog(YarnRMContainerAllocator.
+ class.getName());
+
+ private QueryMasterTask.QueryMasterTaskContext context;
+ private ApplicationAttemptId appAttemptId;
+ private final EventHandler eventHandler;
+
+ public YarnRMContainerAllocator(QueryMasterTask.QueryMasterTaskContext context) {
+ super();
+ this.context = context;
+ this.appAttemptId = ApplicationIdUtils.createApplicationAttemptId(context.getQueryId());
+ this.eventHandler = context.getDispatcher().getEventHandler();
+ }
+
+ public void init(Configuration conf) {
+ super.init(conf);
+ }
+
+ private static final int WAIT_INTERVAL_AVAILABLE_NODES = 500; // 0.5 second
+ public void start() {
+ super.start();
+
+ RegisterApplicationMasterResponse response;
+ try {
+ response = registerApplicationMaster("localhost", 10080, "http://localhost:1234");
+
+ // If the number of cluster nodes is ZERO, it waits for available nodes.
+ AllocateResponse allocateResponse = allocate(0.0f);
+ while(allocateResponse.getNumClusterNodes() < 1) {
+ try {
+ Thread.sleep(WAIT_INTERVAL_AVAILABLE_NODES);
+ LOG.info("Waiting for Available Cluster Nodes");
+ allocateResponse = allocate(0);
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+ }
+ context.getQueryMasterContext().getWorkerContext().setNumClusterNodes(allocateResponse.getNumClusterNodes());
+ } catch (IOException e) {
+ LOG.error(e);
+ } catch (YarnException e) {
+ LOG.error(e);
+ }
+
+ startAllocatorThread();
+ }
+
+ protected Thread allocatorThread;
+ private final AtomicBoolean stopped = new AtomicBoolean(false);
+ private int rmPollInterval = 100;//millis
+
+ protected void startAllocatorThread() {
+ allocatorThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+ try {
+ try {
+ heartbeat();
+ } catch (YarnException e) {
+ LOG.error("Error communicating with RM: " + e.getMessage() , e);
+ return;
+ } catch (Exception e) {
+ LOG.error("ERROR IN CONTACTING RM. ", e);
+ // TODO: for other exceptions
+ if(stopped.get()) {
+ break;
+ }
+ }
+ Thread.sleep(rmPollInterval);
+ } catch (InterruptedException e) {
+ if (!stopped.get()) {
+ LOG.warn("Allocated thread interrupted. Returning.");
+ }
+ break;
+ }
+ }
+ LOG.info("Allocated thread stopped");
+ }
+ });
+ allocatorThread.setName("YarnRMContainerAllocator");
+ allocatorThread.start();
+ }
+
+ public void stop() {
+ if(stopped.get()) {
+ return;
+ }
+ LOG.info("un-registering ApplicationMaster(QueryMaster):" + appAttemptId);
+ stopped.set(true);
+
+ try {
+ FinalApplicationStatus status = FinalApplicationStatus.UNDEFINED;
+ Query query = context.getQuery();
+ if (query != null) {
+ TajoProtos.QueryState state = query.getState();
+ if (state == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+ status = FinalApplicationStatus.SUCCEEDED;
+ } else if (state == TajoProtos.QueryState.QUERY_FAILED || state == TajoProtos.QueryState.QUERY_ERROR) {
+ status = FinalApplicationStatus.FAILED;
+ } else if (state == TajoProtos.QueryState.QUERY_ERROR) {
+ status = FinalApplicationStatus.FAILED;
+ }
+ }
+ unregisterApplicationMaster(status, "tajo query finished", null);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+
+ allocatorThread.interrupt();
+ LOG.info("un-registered ApplicationMAster(QueryMaster) stopped:" + appAttemptId);
+
+ super.stop();
+ }
+
+ private final Map<Priority, ExecutionBlockId> subQueryMap =
+ new HashMap<Priority, ExecutionBlockId>();
+
+ private AtomicLong prevReportTime = new AtomicLong(0);
+ private int reportInterval = 5 * 1000; // second
+
+ public void heartbeat() throws Exception {
+ AllocateResponse allocateResponse = allocate(context.getProgress());
+
+ List<Container> allocatedContainers = allocateResponse.getAllocatedContainers();
+
+ long currentTime = System.currentTimeMillis();
+ if ((currentTime - prevReportTime.longValue()) >= reportInterval) {
+ LOG.debug("Available Cluster Nodes: " + allocateResponse.getNumClusterNodes());
+ LOG.debug("Num of Allocated Containers: " + allocatedContainers.size());
+ LOG.info("Available Resource: " + allocateResponse.getAvailableResources());
+ prevReportTime.set(currentTime);
+ }
+
+ if (allocatedContainers.size() > 0) {
+ LOG.info("================================================================");
+ for (Container container : allocateResponse.getAllocatedContainers()) {
+ LOG.info("> Container Id: " + container.getId());
+ LOG.info("> Node Id: " + container.getNodeId());
+ LOG.info("> Resource (Mem): " + container.getResource().getMemory());
+ LOG.info("> Priority: " + container.getPriority());
+ }
+ LOG.info("================================================================");
+
+ Map<ExecutionBlockId, List<Container>> allocated = new HashMap<ExecutionBlockId, List<Container>>();
+ for (Container container : allocatedContainers) {
+ ExecutionBlockId executionBlockId = subQueryMap.get(container.getPriority());
+ SubQueryState state = context.getSubQuery(executionBlockId).getState();
+ if (!(SubQuery.isRunningState(state))) {
+ releaseAssignedContainer(container.getId());
+ } else {
+ if (allocated.containsKey(executionBlockId)) {
+ allocated.get(executionBlockId).add(container);
+ } else {
+ allocated.put(executionBlockId, Lists.newArrayList(container));
+ }
+ }
+ }
+
+ for (Entry<ExecutionBlockId, List<Container>> entry : allocated.entrySet()) {
+ eventHandler.handle(new SubQueryContainerAllocationEvent(entry.getKey(), entry.getValue()));
+ }
+ }
+ }
+
+ @Override
+ public void handle(ContainerAllocationEvent event) {
+
+ if (event.getType() == ContainerAllocatorEventType.CONTAINER_REQ) {
+ LOG.info(event);
+ subQueryMap.put(event.getPriority(), event.getExecutionBlockId());
+ addContainerRequest(new ContainerRequest(event.getCapability(), null, null,
+ event.getPriority()));
+
+ } else if (event.getType() == ContainerAllocatorEventType.CONTAINER_DEALLOCATE) {
+ LOG.info(event);
+ } else {
+ LOG.info(event);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
new file mode 100644
index 0000000..6d5268c
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import com.google.protobuf.RpcCallback;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.exception.UnimplementedException;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.YarnContainerProxy;
+import org.apache.tajo.master.querymaster.QueryInProgress;
+import org.apache.tajo.util.ApplicationIdUtils;
+import org.apache.tajo.worker.TajoWorker;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.*;
+
+import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource;
+import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerResourceAllocationResponse;
+
+public class YarnTajoResourceManager extends AbstractService implements WorkerResourceManager {
+ private static final Log LOG = LogFactory.getLog(YarnTajoResourceManager.class);
+
+ private YarnClient yarnClient;
+ private ApplicationMasterProtocol rmClient;
+ private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ private Configuration conf;
+ private TajoMaster.MasterContext masterContext;
+
+ public YarnTajoResourceManager() {
+ super(YarnTajoResourceManager.class.getSimpleName());
+ }
+
+ public YarnTajoResourceManager(TajoMaster.MasterContext masterContext) {
+ super(YarnTajoResourceManager.class.getSimpleName());
+ this.masterContext = masterContext;
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ @Override
+ public Map<String, Worker> getWorkers() {
+ return new HashMap<String, Worker>();
+ }
+
+ @Override
+ public Map<String, Worker> getInactiveWorkers() {
+ return new HashMap<String, Worker>();
+ }
+
+ public Collection<String> getQueryMasters() {
+ return new ArrayList<String>();
+ }
+
+ public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() {
+ return TajoMasterProtocol.ClusterResourceSummary.newBuilder()
+ .setNumWorkers(0)
+ .setTotalCpuCoreSlots(0)
+ .setTotalDiskSlots(0)
+ .setTotalMemoryMB(0)
+ .setTotalAvailableCpuCoreSlots(0)
+ .setTotalAvailableDiskSlots(0)
+ .setTotalAvailableMemoryMB(0)
+ .build();
+ }
+
+ @Override
+ public void releaseWorkerResource(YarnProtos.ContainerIdProto containerId) {
+ throw new UnimplementedException("releaseWorkerResource");
+ }
+
+ @Override
+ public WorkerAllocatedResource allocateQueryMaster(QueryInProgress queryInProgress) {
+ throw new UnimplementedException("allocateQueryMaster");
+ }
+
+ @Override
+ public void allocateWorkerResources(
+ TajoMasterProtocol.WorkerResourceAllocationRequest request,
+ RpcCallback<WorkerResourceAllocationResponse> rpcCallBack) {
+ throw new UnimplementedException("allocateWorkerResources");
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ this.conf = conf;
+ connectYarnClient();
+
+ final YarnConfiguration yarnConf = new YarnConfiguration(conf);
+ final YarnRPC rpc = YarnRPC.create(conf);
+ final InetSocketAddress rmAddress = conf.getSocketAddr(
+ YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+
+ UserGroupInformation currentUser;
+ try {
+ currentUser = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new YarnRuntimeException(e);
+ }
+
+ rmClient = currentUser.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
+ @Override
+ public ApplicationMasterProtocol run() {
+ return (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol.class, rmAddress, yarnConf);
+ }
+ });
+ }
+
+ @Override
+ public String getSeedQueryId() throws IOException {
+ try {
+ YarnClientApplication app = yarnClient.createApplication();
+ return app.getApplicationSubmissionContext().getApplicationId().toString();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+
+ throw new IOException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void stopQueryMaster(QueryId queryId) {
+ try {
+ FinalApplicationStatus appStatus = FinalApplicationStatus.UNDEFINED;
+ QueryInProgress queryInProgress = masterContext.getQueryJobManager().getQueryInProgress(queryId);
+ if(queryInProgress == null) {
+ return;
+ }
+ TajoProtos.QueryState state = queryInProgress.getQueryInfo().getQueryState();
+ if (state == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+ appStatus = FinalApplicationStatus.SUCCEEDED;
+ } else if (state == TajoProtos.QueryState.QUERY_FAILED || state == TajoProtos.QueryState.QUERY_ERROR) {
+ appStatus = FinalApplicationStatus.FAILED;
+ } else if (state == TajoProtos.QueryState.QUERY_ERROR) {
+ appStatus = FinalApplicationStatus.FAILED;
+ }
+ FinishApplicationMasterRequest request = recordFactory
+ .newRecordInstance(FinishApplicationMasterRequest.class);
+ request.setFinalApplicationStatus(appStatus);
+ request.setDiagnostics("QueryMaster shutdown by TajoMaster.");
+ rmClient.finishApplicationMaster(request);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ private void connectYarnClient() {
+ this.yarnClient = new YarnClientImpl();
+ this.yarnClient.init(conf);
+ this.yarnClient.start();
+ }
+
+ private ApplicationAttemptId allocateAndLaunchQueryMaster(QueryInProgress queryInProgress) throws IOException, YarnException {
+ QueryId queryId = queryInProgress.getQueryId();
+ ApplicationId appId = ApplicationIdUtils.queryIdToAppId(queryId);
+
+ LOG.info("Allocate and launch ApplicationMaster for QueryMaster: queryId=" +
+ queryId + ", appId=" + appId);
+
+ ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
+
+ // set the application id
+ appContext.setApplicationId(appId);
+ // set the application name
+ appContext.setApplicationName("Tajo");
+
+ Priority pri = Records.newRecord(Priority.class);
+ pri.setPriority(5);
+ appContext.setPriority(pri);
+
+ // Set the queue to which this application is to be submitted in the RM
+ appContext.setQueue("default");
+
+ ContainerLaunchContext commonContainerLaunchContext =
+ YarnContainerProxy.createCommonContainerLaunchContext(masterContext.getConf(), queryId.toString(), true);
+
+ // Setup environment by cloning from common env.
+ Map<String, String> env = commonContainerLaunchContext.getEnvironment();
+ Map<String, String> myEnv = new HashMap<String, String>(env.size());
+ myEnv.putAll(env);
+
+ ////////////////////////////////////////////////////////////////////////////
+ // Set the local resources
+ ////////////////////////////////////////////////////////////////////////////
+ // Set the necessary command to execute the application master
+ Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+ // Set java executable command
+ //LOG.info("Setting up app master command");
+ vargs.add("${JAVA_HOME}" + "/bin/java");
+ // Set Xmx based on am memory size
+ String jvmOptions = masterContext.getConf().get("tajo.rm.yarn.querymaster.jvm.option", "-Xmx2000m");
+
+ for(String eachToken: jvmOptions.split((" "))) {
+ vargs.add(eachToken);
+ }
+ // Set Remote Debugging
+ //if (!context.getQuery().getSubQuery(event.getExecutionBlockId()).isLeafQuery()) {
+ //vargs.add("-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
+ //}
+ // Set class name
+ vargs.add(TajoWorker.class.getCanonicalName());
+ vargs.add("qm");
+ vargs.add(queryId.toString()); // queryId
+ vargs.add(masterContext.getTajoMasterService().getBindAddress().getHostName() + ":" +
+ masterContext.getTajoMasterService().getBindAddress().getPort());
+
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
+
+ // Get final commmand
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+
+ LOG.info("Completed setting up QueryMasterRunner command " + command.toString());
+ List<String> commands = new ArrayList<String>();
+ commands.add(command.toString());
+
+ final Resource resource = Records.newRecord(Resource.class);
+ // TODO - get default value from conf
+ resource.setMemory(2000);
+ resource.setVirtualCores(1);
+
+ Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
+
+ ContainerLaunchContext masterContainerContext = BuilderUtils.newContainerLaunchContext(
+ commonContainerLaunchContext.getLocalResources(),
+ myEnv,
+ commands,
+ myServiceData,
+ null,
+ new HashMap<ApplicationAccessType, String>(2)
+ );
+
+ appContext.setAMContainerSpec(masterContainerContext);
+
+ LOG.info("Submitting QueryMaster to ResourceManager");
+ yarnClient.submitApplication(appContext);
+
+ ApplicationReport appReport = monitorApplication(appId, EnumSet.of(YarnApplicationState.ACCEPTED));
+ ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId();
+
+ LOG.info("Launching QueryMaster with appAttemptId: " + attemptId);
+
+ return attemptId;
+ }
+
+ private ApplicationReport monitorApplication(ApplicationId appId,
+ Set<YarnApplicationState> finalState) throws IOException, YarnException {
+
+ long sleepTime = 100;
+ int count = 1;
+ while (true) {
+ // Get application report for the appId we are interested in
+ ApplicationReport report = yarnClient.getApplicationReport(appId);
+
+ LOG.info("Got application report from ASM for" + ", appId="
+ + appId.getId() + ", appAttemptId="
+ + report.getCurrentApplicationAttemptId() + ", clientToken="
+ + report.getClientToAMToken() + ", appDiagnostics="
+ + report.getDiagnostics() + ", appMasterHost=" + report.getHost()
+ + ", appQueue=" + report.getQueue() + ", appMasterRpcPort="
+ + report.getRpcPort() + ", appStartTime=" + report.getStartTime()
+ + ", yarnAppState=" + report.getYarnApplicationState().toString()
+ + ", distributedFinalState="
+ + report.getFinalApplicationStatus().toString() + ", appTrackingUrl="
+ + report.getTrackingUrl() + ", appUser=" + report.getUser());
+
+ YarnApplicationState state = report.getYarnApplicationState();
+ if (finalState.contains(state)) {
+ return report;
+ }
+ try {
+ Thread.sleep(sleepTime);
+ sleepTime = count * 100;
+ if(count < 10) {
+ count++;
+ }
+ } catch (InterruptedException e) {
+ //LOG.debug("Thread sleep in monitoring loop interrupted");
+ }
+ }
+ }
+
+ public boolean isQueryMasterStopped(QueryId queryId) {
+ ApplicationId appId = ApplicationIdUtils.queryIdToAppId(queryId);
+ try {
+ ApplicationReport report = yarnClient.getApplicationReport(appId);
+ YarnApplicationState state = report.getYarnApplicationState();
+ return EnumSet.of(
+ YarnApplicationState.FINISHED,
+ YarnApplicationState.KILLED,
+ YarnApplicationState.FAILED).contains(state);
+ } catch (YarnException e) {
+ LOG.error(e.getMessage(), e);
+ return false;
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/session/InvalidSessionException.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/InvalidSessionException.java b/tajo-core/src/main/java/org/apache/tajo/master/session/InvalidSessionException.java
new file mode 100644
index 0000000..3f48ca5
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/session/InvalidSessionException.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+public class InvalidSessionException extends Exception {
+ public InvalidSessionException(String sessionId) {
+ super("Invalid session id \"" + sessionId + "\"");
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/session/NoSuchSessionVariableException.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/NoSuchSessionVariableException.java b/tajo-core/src/main/java/org/apache/tajo/master/session/NoSuchSessionVariableException.java
new file mode 100644
index 0000000..686d860
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/session/NoSuchSessionVariableException.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+public class NoSuchSessionVariableException extends Exception {
+ public NoSuchSessionVariableException(String varname) {
+ super("No such session variable \"" + varname + "\"");
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java b/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
new file mode 100644
index 0000000..4d244bf
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.tajo.catalog.Options;
+import org.apache.tajo.common.ProtoObject;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.SessionProto;
+
+public class Session implements SessionConstants, ProtoObject<SessionProto> {
+ private final String sessionId;
+ private final String userName;
+ private final Map<String, String> sessionVariables;
+
+ // transient status
+ private volatile long lastAccessTime;
+ private volatile String currentDatabase;
+
+ public Session(String sessionId, String userName, String databaseName) {
+ this.sessionId = sessionId;
+ this.userName = userName;
+ this.lastAccessTime = System.currentTimeMillis();
+ this.sessionVariables = new HashMap<String, String>();
+ selectDatabase(databaseName);
+ }
+
+ public Session(SessionProto proto) {
+ sessionId = proto.getSessionId();
+ userName = proto.getUsername();
+ currentDatabase = proto.getCurrentDatabase();
+ lastAccessTime = proto.getLastAccessTime();
+ Options options = new Options(proto.getVariables());
+ sessionVariables = options.getAllKeyValus();
+ }
+
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public void updateLastAccessTime() {
+ lastAccessTime = System.currentTimeMillis();
+ }
+
+ public long getLastAccessTime() {
+ return lastAccessTime;
+ }
+
+ public void setVariable(String name, String value) {
+ synchronized (sessionVariables) {
+ sessionVariables.put(name, value);
+ }
+ }
+
+ public String getVariable(String name) throws NoSuchSessionVariableException {
+ synchronized (sessionVariables) {
+ if (sessionVariables.containsKey(name)) {
+ return sessionVariables.get(name);
+ } else {
+ throw new NoSuchSessionVariableException(name);
+ }
+ }
+ }
+
+ public void removeVariable(String name) {
+ synchronized (sessionVariables) {
+ sessionVariables.remove(name);
+ }
+ }
+
+ public synchronized Map<String, String> getAllVariables() {
+ synchronized (sessionVariables) {
+ return ImmutableMap.copyOf(sessionVariables);
+ }
+ }
+
+ public void selectDatabase(String databaseName) {
+ this.currentDatabase = databaseName;
+ }
+
+ public String getCurrentDatabase() {
+ return this.currentDatabase;
+ }
+
+ @Override
+ public SessionProto getProto() {
+ SessionProto.Builder builder = SessionProto.newBuilder();
+ builder.setSessionId(sessionId);
+ builder.setUsername(userName);
+ builder.setCurrentDatabase(currentDatabase);
+ builder.setLastAccessTime(lastAccessTime);
+ Options variables = new Options();
+ variables.putAll(this.sessionVariables);
+ builder.setVariables(variables.getProto());
+ return builder.build();
+ }
+
+ public String toString() {
+ return "user=" + userName + ",id=" + sessionId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/session/SessionConstants.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionConstants.java b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionConstants.java
new file mode 100644
index 0000000..46f49a2
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionConstants.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+public interface SessionConstants {
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEvent.java
new file mode 100644
index 0000000..dce3ba6
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEvent.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class SessionEvent extends AbstractEvent<SessionEventType> {
+ private final String sessionId;
+
+ public SessionEvent(String sessionId, SessionEventType sessionEventType) {
+ super(sessionEventType);
+ this.sessionId = sessionId;
+ }
+
+ public String getSessionId() {
+ return sessionId;
+ }
+}