You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 13:44:21 UTC
[18/57] [abbrv] [partial] TAJO-752: Escalate sub modules in tajo-core
into the top-level modules. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
new file mode 100644
index 0000000..79f14a2
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.ContainerProxy;
+import org.apache.tajo.master.TajoContainerProxy;
+import org.apache.tajo.master.TaskRunnerGroupEvent;
+import org.apache.tajo.master.TaskRunnerLauncher;
+import org.apache.tajo.master.event.ContainerAllocationEvent;
+import org.apache.tajo.master.event.ContainerAllocatorEventType;
+import org.apache.tajo.master.event.SubQueryContainerAllocationEvent;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.master.querymaster.SubQuery;
+import org.apache.tajo.master.querymaster.SubQueryState;
+import org.apache.tajo.master.rm.TajoWorkerContainer;
+import org.apache.tajo.master.rm.TajoWorkerContainerId;
+import org.apache.tajo.master.rm.Worker;
+import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.util.ApplicationIdUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TajoResourceAllocator extends AbstractResourceAllocator {
+ private static final Log LOG = LogFactory.getLog(TajoResourceAllocator.class);
+
+ private TajoConf tajoConf;
+ private QueryMasterTask.QueryMasterTaskContext queryTaskContext;
+ private final ExecutorService executorService;
+
+ private AtomicBoolean stopped = new AtomicBoolean(false);
+
+ public TajoResourceAllocator(QueryMasterTask.QueryMasterTaskContext queryTaskContext) {
+ this.queryTaskContext = queryTaskContext;
+ executorService = Executors.newFixedThreadPool(
+ queryTaskContext.getConf().getIntVar(TajoConf.ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
+ }
+
+ @Override
+ public ContainerId makeContainerId(YarnProtos.ContainerIdProto containerIdProto) {
+ TajoWorkerContainerId containerId = new TajoWorkerContainerId();
+ ApplicationAttemptId appAttemptId = new ApplicationAttemptIdPBImpl(containerIdProto.getAppAttemptId());
+ containerId.setApplicationAttemptId(appAttemptId);
+ containerId.setId(containerIdProto.getId());
+ return containerId;
+ }
+
+ @Override
+ public void allocateTaskWorker() {
+ }
+
+ @Override
+ public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext,
+ int numTasks,
+ int memoryMBPerTask) {
+ //TODO consider disk slot
+
+ TajoMasterProtocol.ClusterResourceSummary clusterResource = workerContext.getClusterResource();
+ int clusterSlots = clusterResource == null ? 0 : clusterResource.getTotalMemoryMB() / memoryMBPerTask;
+ clusterSlots = Math.max(1, clusterSlots - 1); // reserve query master slot
+ LOG.info("CalculateNumberRequestContainer - Number of Tasks=" + numTasks +
+ ", Number of Cluster Slots=" + clusterSlots);
+ return Math.min(numTasks, clusterSlots);
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ tajoConf = (TajoConf)conf;
+
+ queryTaskContext.getDispatcher().register(TaskRunnerGroupEvent.EventType.class, new TajoTaskRunnerLauncher());
+
+ queryTaskContext.getDispatcher().register(ContainerAllocatorEventType.class, new TajoWorkerAllocationHandler());
+
+ super.init(conf);
+ }
+
+ @Override
+ public synchronized void stop() {
+ if (stopped.getAndSet(true)) {
+ return;
+ }
+
+ executorService.shutdownNow();
+
+ Map<ContainerId, ContainerProxy> containers = queryTaskContext.getResourceAllocator().getContainers();
+ List<ContainerProxy> list = new ArrayList<ContainerProxy>(containers.values());
+ for(ContainerProxy eachProxy: list) {
+ try {
+ eachProxy.stopContainer();
+ } catch (Exception e) {
+ LOG.warn(e.getMessage());
+ }
+ }
+ super.stop();
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ }
+
+ class TajoTaskRunnerLauncher implements TaskRunnerLauncher {
+ @Override
+ public void handle(TaskRunnerGroupEvent event) {
+ if (event.getType() == TaskRunnerGroupEvent.EventType.CONTAINER_REMOTE_LAUNCH) {
+ launchTaskRunners(event.getExecutionBlockId(), event.getContainers());
+ } else if (event.getType() == TaskRunnerGroupEvent.EventType.CONTAINER_REMOTE_CLEANUP) {
+ stopContainers(event.getContainers());
+ }
+ }
+ }
+
+ private void launchTaskRunners(ExecutionBlockId executionBlockId, Collection<Container> containers) {
+ // Query in standby mode doesn't need launch Worker.
+ // But, Assign ExecutionBlock to assigned tajo worker
+ for(Container eachContainer: containers) {
+ TajoContainerProxy containerProxy = new TajoContainerProxy(queryTaskContext, tajoConf,
+ eachContainer, executionBlockId);
+ executorService.submit(new LaunchRunner(eachContainer.getId(), containerProxy));
+ }
+ }
+
+ protected static class LaunchRunner implements Runnable {
+ private final ContainerProxy proxy;
+ private final ContainerId id;
+ public LaunchRunner(ContainerId id, ContainerProxy proxy) {
+ this.proxy = proxy;
+ this.id = id;
+ }
+ @Override
+ public void run() {
+ proxy.launch(null);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ContainerProxy started:" + id);
+ }
+ }
+ }
+
+ private void stopContainers(Collection<Container> containers) {
+ for (Container container : containers) {
+ final ContainerProxy proxy = queryTaskContext.getResourceAllocator().getContainer(container.getId());
+ executorService.submit(new StopContainerRunner(container.getId(), proxy));
+ }
+ }
+
+ private static class StopContainerRunner implements Runnable {
+ private final ContainerProxy proxy;
+ private final ContainerId id;
+ public StopContainerRunner(ContainerId id, ContainerProxy proxy) {
+ this.id = id;
+ this.proxy = proxy;
+ }
+
+ @Override
+ public void run() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ContainerProxy stopped:" + id + "," + proxy.getId());
+ }
+ proxy.stopContainer();
+ }
+ }
+
+ class TajoWorkerAllocationHandler implements EventHandler<ContainerAllocationEvent> {
+ @Override
+ public void handle(ContainerAllocationEvent event) {
+ executorService.submit(new TajoWorkerAllocationThread(event));
+ }
+ }
+
+ class TajoWorkerAllocationThread extends Thread {
+ ContainerAllocationEvent event;
+ TajoWorkerAllocationThread(ContainerAllocationEvent event) {
+ this.event = event;
+ }
+
+ @Override
+ public void run() {
+ LOG.info("Start TajoWorkerAllocationThread");
+ CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse> callBack =
+ new CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse>();
+
+ //TODO consider task's resource usage pattern
+ int requiredMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TASK_DEFAULT_MEMORY);
+ float requiredDiskSlots = tajoConf.getFloatVar(TajoConf.ConfVars.TASK_DEFAULT_DISK);
+
+ TajoMasterProtocol.WorkerResourceAllocationRequest request =
+ TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder()
+ .setMinMemoryMBPerContainer(requiredMemoryMB)
+ .setMaxMemoryMBPerContainer(requiredMemoryMB)
+ .setNumContainers(event.getRequiredNum())
+ .setResourceRequestPriority(!event.isLeafQuery() ? TajoMasterProtocol.ResourceRequestPriority.MEMORY
+ : TajoMasterProtocol.ResourceRequestPriority.DISK)
+ .setMinDiskSlotPerContainer(requiredDiskSlots)
+ .setMaxDiskSlotPerContainer(requiredDiskSlots)
+ .setQueryId(event.getExecutionBlockId().getQueryId().getProto())
+ .build();
+
+ RpcConnectionPool connPool = RpcConnectionPool.getPool(queryTaskContext.getConf());
+ NettyClientBase tmClient = null;
+ try {
+ tmClient = connPool.getConnection(
+ queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
+ TajoMasterProtocol.class, true);
+ TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+ masterClientService.allocateWorkerResources(null, request, callBack);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ } finally {
+ connPool.releaseConnection(tmClient);
+ }
+
+ TajoMasterProtocol.WorkerResourceAllocationResponse response = null;
+ while(!stopped.get()) {
+ try {
+ response = callBack.get(3, TimeUnit.SECONDS);
+ break;
+ } catch (InterruptedException e) {
+ if(stopped.get()) {
+ return;
+ }
+ } catch (TimeoutException e) {
+ LOG.info("No available worker resource for " + event.getExecutionBlockId());
+ continue;
+ }
+ }
+ int numAllocatedContainers = 0;
+
+ if(response != null) {
+ List<TajoMasterProtocol.WorkerAllocatedResource> allocatedResources = response.getWorkerAllocatedResourceList();
+ ExecutionBlockId executionBlockId = event.getExecutionBlockId();
+
+ List<Container> containers = new ArrayList<Container>();
+ for(TajoMasterProtocol.WorkerAllocatedResource eachAllocatedResource: allocatedResources) {
+ TajoWorkerContainer container = new TajoWorkerContainer();
+ NodeId nodeId = NodeId.newInstance(eachAllocatedResource.getWorkerHost(),
+ eachAllocatedResource.getPeerRpcPort());
+
+ TajoWorkerContainerId containerId = new TajoWorkerContainerId();
+
+ containerId.setApplicationAttemptId(
+ ApplicationIdUtils.createApplicationAttemptId(executionBlockId.getQueryId(),
+ eachAllocatedResource.getContainerId().getAppAttemptId().getAttemptId()));
+ containerId.setId(eachAllocatedResource.getContainerId().getId());
+
+ container.setId(containerId);
+ container.setNodeId(nodeId);
+
+
+ WorkerResource workerResource = new WorkerResource();
+ workerResource.setMemoryMB(eachAllocatedResource.getAllocatedMemoryMB());
+ workerResource.setDiskSlots(eachAllocatedResource.getAllocatedDiskSlots());
+
+ Worker worker = new Worker(null, workerResource);
+ worker.setHostName(nodeId.getHost());
+ worker.setPeerRpcPort(nodeId.getPort());
+ worker.setQueryMasterPort(eachAllocatedResource.getQueryMasterPort());
+ worker.setPullServerPort(eachAllocatedResource.getWorkerPullServerPort());
+
+ container.setWorkerResource(worker);
+
+ containers.add(container);
+ }
+
+ SubQueryState state = queryTaskContext.getSubQuery(executionBlockId).getState();
+ if (!SubQuery.isRunningState(state)) {
+ try {
+ List<ContainerId> containerIds = new ArrayList<ContainerId>();
+ for(Container eachContainer: containers) {
+ containerIds.add(eachContainer.getId());
+ }
+ TajoContainerProxy.releaseWorkerResource(queryTaskContext, executionBlockId, containerIds);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ return;
+ }
+
+ if (allocatedResources.size() > 0) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("SubQueryContainerAllocationEvent fire:" + executionBlockId);
+ }
+ queryTaskContext.getEventHandler().handle(new SubQueryContainerAllocationEvent(executionBlockId, containers));
+ }
+ numAllocatedContainers += allocatedResources.size();
+
+ }
+ if(event.getRequiredNum() > numAllocatedContainers) {
+ ContainerAllocationEvent shortRequestEvent = new ContainerAllocationEvent(
+ event.getType(), event.getExecutionBlockId(), event.getPriority(),
+ event.getResource(),
+ event.getRequiredNum() - numAllocatedContainers,
+ event.isLeafQuery(), event.getProgress()
+ );
+ queryTaskContext.getEventHandler().handle(shortRequestEvent);
+
+ }
+ LOG.info("Stop TajoWorkerAllocationThread");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
new file mode 100644
index 0000000..0b8d6c2
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -0,0 +1,583 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.codahale.metrics.Gauge;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.shell.PathData;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.catalog.CatalogClient;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.querymaster.QueryMaster;
+import org.apache.tajo.master.querymaster.QueryMasterManagerService;
+import org.apache.tajo.master.rm.TajoWorkerResourceManager;
+import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.rpc.RpcChannelFactory;
+import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.TajoIdUtils;
+import org.apache.tajo.util.metrics.TajoSystemMetrics;
+import org.apache.tajo.webapp.StaticHttpServer;
+
+import java.io.*;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.tajo.conf.TajoConf.ConfVars;
+
+public class TajoWorker extends CompositeService {
+ public static final PrimitiveProtos.BoolProto TRUE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
+ public static final PrimitiveProtos.BoolProto FALSE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
+
+ public static final String WORKER_MODE_YARN_TASKRUNNER = "tr";
+ public static final String WORKER_MODE_YARN_QUERYMASTER = "qm";
+ public static final String WORKER_MODE_STANDBY = "standby";
+ public static final String WORKER_MODE_QUERY_MASTER = "standby-qm";
+ public static final String WORKER_MODE_TASKRUNNER = "standby-tr";
+
+ private static final Log LOG = LogFactory.getLog(TajoWorker.class);
+
+ private TajoConf systemConf;
+
+ private StaticHttpServer webServer;
+
+ private TajoWorkerClientService tajoWorkerClientService;
+
+ private QueryMasterManagerService queryMasterManagerService;
+
+ private TajoWorkerManagerService tajoWorkerManagerService;
+
+ private InetSocketAddress tajoMasterAddress;
+
+ private InetSocketAddress workerResourceTrackerAddr;
+
+ private CatalogClient catalogClient;
+
+ private WorkerContext workerContext;
+
+ private TaskRunnerManager taskRunnerManager;
+
+ private TajoPullServerService pullService;
+
+ private boolean yarnContainerMode;
+
+ private boolean queryMasterMode;
+
+ private boolean taskRunnerMode;
+
+ private WorkerHeartbeatService workerHeartbeatThread;
+
+ private AtomicBoolean stopped = new AtomicBoolean(false);
+
+ private AtomicInteger numClusterNodes = new AtomicInteger();
+
+ private TajoMasterProtocol.ClusterResourceSummary clusterResource;
+
+ private int httpPort;
+
+ private ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+
+ private RpcConnectionPool connPool;
+
+ private String[] cmdArgs;
+
+ private DeletionService deletionService;
+
+ private TajoSystemMetrics workerSystemMetrics;
+
+ public TajoWorker() throws Exception {
+ super(TajoWorker.class.getName());
+ }
+
+ public void startWorker(TajoConf systemConf, String[] args) {
+ this.systemConf = systemConf;
+ this.cmdArgs = args;
+ setWorkerMode(args);
+ init(systemConf);
+ start();
+ }
+
+ private void setWorkerMode(String[] args) {
+ if(args.length < 1) {
+ queryMasterMode = systemConf.getBoolean("tajo.worker.mode.querymaster", true);
+ taskRunnerMode = systemConf.getBoolean("tajo.worker.mode.taskrunner", true);
+ } else {
+ if(WORKER_MODE_STANDBY.equals(args[0])) {
+ queryMasterMode = true;
+ taskRunnerMode = true;
+ } else if(WORKER_MODE_YARN_TASKRUNNER.equals(args[0])) {
+ yarnContainerMode = true;
+ queryMasterMode = true;
+ } else if(WORKER_MODE_YARN_QUERYMASTER.equals(args[0])) {
+ yarnContainerMode = true;
+ taskRunnerMode = true;
+ } else if(WORKER_MODE_QUERY_MASTER.equals(args[0])) {
+ yarnContainerMode = false;
+ queryMasterMode = true;
+ } else {
+ yarnContainerMode = false;
+ taskRunnerMode = true;
+ }
+ }
+ if(!queryMasterMode && !taskRunnerMode) {
+ LOG.fatal("Worker daemon exit cause no worker mode(querymaster/taskrunner) property");
+ System.exit(0);
+ }
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
+
+ this.systemConf = (TajoConf)conf;
+ RackResolver.init(systemConf);
+
+ this.connPool = RpcConnectionPool.getPool(systemConf);
+ this.workerContext = new WorkerContext();
+
+ String resourceManagerClassName = systemConf.getVar(ConfVars.RESOURCE_MANAGER_CLASS);
+
+ boolean randomPort = true;
+ if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) {
+ randomPort = false;
+ }
+ int clientPort = systemConf.getSocketAddrVar(ConfVars.WORKER_CLIENT_RPC_ADDRESS).getPort();
+ int peerRpcPort = systemConf.getSocketAddrVar(ConfVars.WORKER_PEER_RPC_ADDRESS).getPort();
+ int qmManagerPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_RPC_ADDRESS).getPort();
+
+ if(randomPort) {
+ clientPort = 0;
+ peerRpcPort = 0;
+ qmManagerPort = 0;
+ systemConf.setIntVar(ConfVars.PULLSERVER_PORT, 0);
+ }
+
+ // querymaster worker
+ tajoWorkerClientService = new TajoWorkerClientService(workerContext, clientPort);
+ addService(tajoWorkerClientService);
+
+ queryMasterManagerService = new QueryMasterManagerService(workerContext, qmManagerPort);
+ addService(queryMasterManagerService);
+
+ // taskrunner worker
+ taskRunnerManager = new TaskRunnerManager(workerContext);
+ addService(taskRunnerManager);
+
+ tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, peerRpcPort);
+ addService(tajoWorkerManagerService);
+
+ if(!yarnContainerMode) {
+ if(taskRunnerMode) {
+ pullService = new TajoPullServerService();
+ addService(pullService);
+ }
+
+ if (!systemConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE")) {
+ try {
+ httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_INFO_ADDRESS).getPort();
+ if(queryMasterMode && !taskRunnerMode) {
+ //If QueryMaster and TaskRunner run on single host, http port conflicts
+ httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_INFO_ADDRESS).getPort();
+ }
+ webServer = StaticHttpServer.getInstance(this ,"worker", null, httpPort ,
+ true, null, systemConf, null);
+ webServer.start();
+ httpPort = webServer.getPort();
+ LOG.info("Worker info server started:" + httpPort);
+
+ deletionService = new DeletionService(getMountPath().size(), 0);
+ if(systemConf.getBoolVar(ConfVars.WORKER_TEMPORAL_DIR_CLEANUP)){
+ getWorkerContext().cleanupTemporalDirectories();
+ }
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ }
+
+ LOG.info("Tajo Worker started: queryMaster=" + queryMasterMode + " taskRunner=" + taskRunnerMode +
+ ", qmRpcPort=" + qmManagerPort +
+ ",yarnContainer=" + yarnContainerMode + ", clientPort=" + clientPort +
+ ", peerRpcPort=" + peerRpcPort + ":" + qmManagerPort + ",httpPort" + httpPort);
+
+ super.init(conf);
+
+ if(yarnContainerMode && queryMasterMode) {
+ tajoMasterAddress = NetUtils.createSocketAddr(cmdArgs[2]);
+ connectToCatalog();
+
+ QueryId queryId = TajoIdUtils.parseQueryId(cmdArgs[1]);
+ queryMasterManagerService.getQueryMaster().reportQueryStatusToQueryMaster(
+ queryId, TajoProtos.QueryState.QUERY_MASTER_LAUNCHED);
+ } else if(yarnContainerMode && taskRunnerMode) { //TaskRunner mode
+ taskRunnerManager.startTask(cmdArgs);
+ } else {
+ tajoMasterAddress = NetUtils.createSocketAddr(systemConf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
+ workerResourceTrackerAddr = NetUtils.createSocketAddr(systemConf.getVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS));
+ connectToCatalog();
+ }
+
+ workerHeartbeatThread = new WorkerHeartbeatService(workerContext);
+ workerHeartbeatThread.init(conf);
+ addIfService(workerHeartbeatThread);
+ }
+
+ private void initWorkerMetrics() {
+ workerSystemMetrics = new TajoSystemMetrics(systemConf, "worker", workerContext.getWorkerName());
+ workerSystemMetrics.start();
+
+ workerSystemMetrics.register("querymaster", "runningQueries", new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ if(queryMasterManagerService != null) {
+ return queryMasterManagerService.getQueryMaster().getQueryMasterTasks().size();
+ } else {
+ return 0;
+ }
+ }
+ });
+
+ workerSystemMetrics.register("task", "runningTasks", new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ if(taskRunnerManager != null) {
+ return taskRunnerManager.getNumTasks();
+ } else {
+ return 0;
+ }
+ }
+ });
+ }
+
+ public WorkerContext getWorkerContext() {
+ return workerContext;
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ initWorkerMetrics();
+ }
+
+ @Override
+ public void stop() {
+ if(stopped.getAndSet(true)) {
+ return;
+ }
+
+ if(webServer != null) {
+ try {
+ webServer.stop();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ if (catalogClient != null) {
+ catalogClient.close();
+ }
+
+ if(connPool != null) {
+ connPool.shutdown();
+ RpcChannelFactory.shutdown();
+ }
+
+ if(webServer != null && webServer.isAlive()) {
+ try {
+ webServer.stop();
+ } catch (Exception e) {
+ }
+ }
+
+ if(workerSystemMetrics != null) {
+ workerSystemMetrics.stop();
+ }
+
+ if(deletionService != null) deletionService.stop();
+ super.stop();
+ LOG.info("TajoWorker main thread exiting");
+ }
+
+ public class WorkerContext {
+ public QueryMaster getQueryMaster() {
+ if(queryMasterManagerService == null) {
+ return null;
+ }
+ return queryMasterManagerService.getQueryMaster();
+ }
+
+ public TajoWorkerManagerService getTajoWorkerManagerService() {
+ return tajoWorkerManagerService;
+ }
+
+ public QueryMasterManagerService getQueryMasterManagerService() {
+ return queryMasterManagerService;
+ }
+
+ public TajoWorkerClientService getTajoWorkerClientService() {
+ return tajoWorkerClientService;
+ }
+
+ public TaskRunnerManager getTaskRunnerManager() {
+ return taskRunnerManager;
+ }
+
+ public CatalogService getCatalog() {
+ return catalogClient;
+ }
+
+ public TajoPullServerService getPullService() {
+ return pullService;
+ }
+
+ public int getHttpPort() {
+ return httpPort;
+ }
+
+ public String getWorkerName() {
+ if(queryMasterMode) {
+ return getQueryMasterManagerService().getHostAndPort();
+ } else {
+ return getTajoWorkerManagerService().getHostAndPort();
+ }
+ }
+ public void stopWorker(boolean force) {
+ stop();
+ if(force) {
+ System.exit(0);
+ }
+ }
+
+ protected void cleanup(String strPath) {
+ if(deletionService == null) return;
+
+ LocalDirAllocator lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+
+ try {
+ Iterable<Path> iter = lDirAllocator.getAllLocalPathsToRead(strPath, systemConf);
+ FileSystem localFS = FileSystem.getLocal(systemConf);
+ for (Path path : iter){
+ deletionService.delete(localFS.makeQualified(path));
+ }
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ protected void cleanupTemporalDirectories() {
+ if(deletionService == null) return;
+
+ LocalDirAllocator lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+
+ try {
+ Iterable<Path> iter = lDirAllocator.getAllLocalPathsToRead(".", systemConf);
+ FileSystem localFS = FileSystem.getLocal(systemConf);
+ for (Path path : iter){
+ PathData[] items = PathData.expandAsGlob(localFS.makeQualified(new Path(path, "*")).toString(), systemConf);
+
+ ArrayList<Path> paths = new ArrayList<Path>();
+ for (PathData pd : items){
+ paths.add(pd.path);
+ }
+ if(paths.size() == 0) continue;
+
+ deletionService.delete(null, paths.toArray(new Path[paths.size()]));
+ }
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ public boolean isYarnContainerMode() {
+ return yarnContainerMode;
+ }
+
+ public void setNumClusterNodes(int numClusterNodes) {
+ TajoWorker.this.numClusterNodes.set(numClusterNodes);
+ }
+
+ public int getNumClusterNodes() {
+ return TajoWorker.this.numClusterNodes.get();
+ }
+
+ public void setClusterResource(TajoMasterProtocol.ClusterResourceSummary clusterResource) {
+ synchronized(numClusterNodes) {
+ TajoWorker.this.clusterResource = clusterResource;
+ }
+ }
+
+ public TajoMasterProtocol.ClusterResourceSummary getClusterResource() {
+ synchronized(numClusterNodes) {
+ return TajoWorker.this.clusterResource;
+ }
+ }
+
+ public InetSocketAddress getTajoMasterAddress() {
+ return tajoMasterAddress;
+ }
+
+ public InetSocketAddress getResourceTrackerAddress() {
+ return workerResourceTrackerAddr;
+ }
+
+ public int getPeerRpcPort() {
+ return getTajoWorkerManagerService() == null ? 0 : getTajoWorkerManagerService().getBindAddr().getPort();
+ }
+
+ public boolean isQueryMasterMode() {
+ return queryMasterMode;
+ }
+
+ public boolean isTaskRunnerMode() {
+ return taskRunnerMode;
+ }
+
+ public TajoSystemMetrics getWorkerSystemMetrics() {
+ return workerSystemMetrics;
+ }
+ }
+
+ public void stopWorkerForce() {
+ stop();
+ }
+
+ private void connectToCatalog() {
+ try {
+ catalogClient = new CatalogClient(systemConf);
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ private class ShutdownHook implements Runnable {
+ @Override
+ public void run() {
+ if(!stopped.get()) {
+ LOG.info("============================================");
+ LOG.info("TajoWorker received SIGINT Signal");
+ LOG.info("============================================");
+ stop();
+ }
+ }
+ }
+
+ String getThreadTaskName(long id, String name) {
+ if (name == null) {
+ return Long.toString(id);
+ }
+ return id + " (" + name + ")";
+ }
+
+ public void dumpThread(Writer writer) {
+ PrintWriter stream = new PrintWriter(writer);
+ int STACK_DEPTH = 20;
+ boolean contention = threadBean.isThreadContentionMonitoringEnabled();
+ long[] threadIds = threadBean.getAllThreadIds();
+ stream.println("Process Thread Dump: Tajo Worker");
+ stream.println(threadIds.length + " active threads");
+ for (long tid : threadIds) {
+ ThreadInfo info = threadBean.getThreadInfo(tid, STACK_DEPTH);
+ if (info == null) {
+ stream.println(" Inactive");
+ continue;
+ }
+ stream.println("Thread " + getThreadTaskName(info.getThreadId(), info.getThreadName()) + ":");
+ Thread.State state = info.getThreadState();
+ stream.println(" State: " + state + ", Blocked count: " + info.getBlockedCount() +
+ ", Waited count: " + info.getWaitedCount());
+ if (contention) {
+ stream.println(" Blocked time: " + info.getBlockedTime() + ", Waited time: " + info.getWaitedTime());
+ }
+ if (state == Thread.State.WAITING) {
+ stream.println(" Waiting on " + info.getLockName());
+ } else if (state == Thread.State.BLOCKED) {
+ stream.println(" Blocked on " + info.getLockName() +
+ ", Blocked by " + getThreadTaskName(info.getLockOwnerId(), info.getLockOwnerName()));
+ }
+ stream.println(" Stack:");
+ for (StackTraceElement frame : info.getStackTrace()) {
+ stream.println(" " + frame.toString());
+ }
+ stream.println("");
+ }
+ }
+
+ public static List<File> getMountPath() throws IOException {
+ BufferedReader mountOutput = null;
+ try {
+ Process mountProcess = Runtime.getRuntime ().exec("mount");
+ mountOutput = new BufferedReader(new InputStreamReader(mountProcess.getInputStream()));
+ List<File> mountPaths = new ArrayList<File>();
+ while (true) {
+ String line = mountOutput.readLine();
+ if (line == null) {
+ break;
+ }
+
+ int indexStart = line.indexOf(" on /");
+ int indexEnd = line.indexOf(" ", indexStart + 4);
+
+ mountPaths.add(new File(line.substring (indexStart + 4, indexEnd)));
+ }
+ return mountPaths;
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw e;
+ } finally {
+ if(mountOutput != null) {
+ mountOutput.close();
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ StringUtils.startupShutdownMessage(TajoWorker.class, args, LOG);
+
+ TajoConf tajoConf = new TajoConf();
+ tajoConf.addResource(new Path(TajoConstants.SYSTEM_CONF_FILENAME));
+
+ try {
+ TajoWorker tajoWorker = new TajoWorker();
+ tajoWorker.startWorker(tajoConf, args);
+ } catch (Throwable t) {
+ LOG.fatal("Error starting TajoWorker", t);
+ System.exit(-1);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
new file mode 100644
index 0000000..937d886
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.QueryMasterClientProtocol;
+import org.apache.tajo.master.querymaster.Query;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.rpc.BlockingRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.util.NetUtils;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+public class TajoWorkerClientService extends AbstractService {
+ private static final Log LOG = LogFactory.getLog(TajoWorkerClientService.class);
+ private final PrimitiveProtos.BoolProto BOOL_TRUE =
+ PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
+ private final PrimitiveProtos.BoolProto BOOL_FALSE =
+ PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
+
+ private BlockingRpcServer rpcServer;
+ private InetSocketAddress bindAddr;
+ private String addr;
+ private int port;
+ private TajoConf conf;
+ private TajoWorker.WorkerContext workerContext;
+ private TajoWorkerClientProtocolServiceHandler serviceHandler;
+
+ public TajoWorkerClientService(TajoWorker.WorkerContext workerContext, int port) {
+ super(TajoWorkerClientService.class.getName());
+
+ this.port = port;
+ this.workerContext = workerContext;
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ Preconditions.checkArgument(conf instanceof TajoConf);
+ this.conf = (TajoConf) conf;
+ this.serviceHandler = new TajoWorkerClientProtocolServiceHandler();
+
+ // init RPC Server in constructor cause Heartbeat Thread use bindAddr
+ // Setup RPC server
+ try {
+ // TODO initial port num is value of config and find unused port with sequence
+ InetSocketAddress initIsa = new InetSocketAddress("0.0.0.0", port);
+ if (initIsa.getAddress() == null) {
+ throw new IllegalArgumentException("Failed resolve of " + initIsa);
+ }
+
+ // TODO blocking/non-blocking??
+ int workerNum = this.conf.getIntVar(TajoConf.ConfVars.WORKER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM);
+ this.rpcServer = new BlockingRpcServer(QueryMasterClientProtocol.class, serviceHandler, initIsa, workerNum);
+ this.rpcServer.start();
+
+ this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
+ this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
+
+ this.port = bindAddr.getPort();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ // Get the master address
+ LOG.info(TajoWorkerClientService.class.getSimpleName() + " is bind to " + addr);
+
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ LOG.info("TajoWorkerClientService stopping");
+ if(rpcServer != null) {
+ rpcServer.shutdown();
+ }
+ LOG.info("TajoWorkerClientService stopped");
+ super.stop();
+ }
+
+ public InetSocketAddress getBindAddr() {
+ return bindAddr;
+ }
+
+ public class TajoWorkerClientProtocolServiceHandler
+ implements QueryMasterClientProtocol.QueryMasterClientProtocolService.BlockingInterface {
+ @Override
+ public PrimitiveProtos.BoolProto updateSessionVariables(
+ RpcController controller,
+ ClientProtos.UpdateSessionVariableRequest request) throws ServiceException {
+ return null;
+ }
+
+ @Override
+ public ClientProtos.GetQueryResultResponse getQueryResult(
+ RpcController controller,
+ ClientProtos.GetQueryResultRequest request) throws ServiceException {
+ QueryId queryId = new QueryId(request.getQueryId());
+ Query query = workerContext.getQueryMaster().getQueryMasterTask(queryId, true).getQuery();
+
+ ClientProtos.GetQueryResultResponse.Builder builder = ClientProtos.GetQueryResultResponse.newBuilder();
+ try {
+ builder.setTajoUserName(UserGroupInformation.getCurrentUser().getUserName());
+ } catch (IOException e) {
+ LOG.warn("Can't get current user name");
+ }
+
+ if(query == null) {
+ builder.setErrorMessage("No Query for " + queryId);
+ } else {
+ switch (query.getState()) {
+ case QUERY_SUCCEEDED:
+ builder.setTableDesc(query.getResultDesc().getProto());
+ break;
+ case QUERY_FAILED:
+ case QUERY_ERROR:
+ builder.setErrorMessage("Query " + queryId + " is failed");
+ default:
+ builder.setErrorMessage("Query " + queryId + " is still running");
+ }
+ }
+ return builder.build();
+ }
+
+ @Override
+ public ClientProtos.GetQueryStatusResponse getQueryStatus(
+ RpcController controller,
+ ClientProtos.GetQueryStatusRequest request) throws ServiceException {
+ ClientProtos.GetQueryStatusResponse.Builder builder
+ = ClientProtos.GetQueryStatusResponse.newBuilder();
+ QueryId queryId = new QueryId(request.getQueryId());
+
+ builder.setQueryId(request.getQueryId());
+
+ if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
+ builder.setResultCode(ClientProtos.ResultCode.OK);
+ builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
+ } else {
+ QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId);
+
+ builder.setResultCode(ClientProtos.ResultCode.OK);
+ builder.setQueryMasterHost(bindAddr.getHostName());
+ builder.setQueryMasterPort(bindAddr.getPort());
+
+ if (queryMasterTask == null) {
+ queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId, true);
+ }
+ if (queryMasterTask == null) {
+ builder.setState(TajoProtos.QueryState.QUERY_NOT_ASSIGNED);
+ return builder.build();
+ }
+
+ builder.setHasResult(
+ !(queryMasterTask.getQueryTaskContext().getQueryContext().isCreateTable() ||
+ queryMasterTask.getQueryTaskContext().getQueryContext().isInsert())
+ );
+
+ queryMasterTask.touchSessionTime();
+ Query query = queryMasterTask.getQuery();
+
+ if (query != null) {
+ builder.setState(query.getState());
+ builder.setProgress(query.getProgress());
+ builder.setSubmitTime(query.getAppSubmitTime());
+ if (query.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+ builder.setFinishTime(query.getFinishTime());
+ } else {
+ builder.setFinishTime(System.currentTimeMillis());
+ }
+ } else {
+ builder.setState(queryMasterTask.getState());
+ builder.setErrorMessage(queryMasterTask.getErrorMessage());
+ }
+ }
+ return builder.build();
+ }
+
+ @Override
+ public PrimitiveProtos.BoolProto closeQuery (
+ RpcController controller,
+ TajoIdProtos.QueryIdProto request) throws ServiceException {
+ final QueryId queryId = new QueryId(request);
+ LOG.info("Stop Query:" + queryId);
+ return BOOL_TRUE;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
new file mode 100644
index 0000000..392a7cf
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.rpc.AsyncRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.util.NetUtils;
+
+import java.net.InetSocketAddress;
+
+public class TajoWorkerManagerService extends CompositeService
+ implements TajoWorkerProtocol.TajoWorkerProtocolService.Interface {
+ private static final Log LOG = LogFactory.getLog(TajoWorkerManagerService.class.getName());
+
+ private AsyncRpcServer rpcServer;
+ private InetSocketAddress bindAddr;
+ private String addr;
+ private int port;
+
+ private TajoWorker.WorkerContext workerContext;
+
+ public TajoWorkerManagerService(TajoWorker.WorkerContext workerContext, int port) {
+ super(TajoWorkerManagerService.class.getName());
+ this.workerContext = workerContext;
+ this.port = port;
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ Preconditions.checkArgument(conf instanceof TajoConf);
+ TajoConf tajoConf = (TajoConf) conf;
+ try {
+ // Setup RPC server
+ InetSocketAddress initIsa =
+ new InetSocketAddress("0.0.0.0", port);
+ if (initIsa.getAddress() == null) {
+ throw new IllegalArgumentException("Failed resolve of " + initIsa);
+ }
+
+ int workerNum = tajoConf.getIntVar(TajoConf.ConfVars.WORKER_RPC_SERVER_WORKER_THREAD_NUM);
+ this.rpcServer = new AsyncRpcServer(TajoWorkerProtocol.class, this, initIsa, workerNum);
+ this.rpcServer.start();
+
+ this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
+ this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
+
+ this.port = bindAddr.getPort();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ // Get the master address
+ LOG.info("TajoWorkerManagerService is bind to " + addr);
+ tajoConf.setVar(TajoConf.ConfVars.WORKER_PEER_RPC_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddr));
+ super.init(tajoConf);
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ if(rpcServer != null) {
+ rpcServer.shutdown();
+ }
+ LOG.info("TajoWorkerManagerService stopped");
+ super.stop();
+ }
+
+ public InetSocketAddress getBindAddr() {
+ return bindAddr;
+ }
+
+ public String getHostAndPort() {
+ return bindAddr.getHostName() + ":" + bindAddr.getPort();
+ }
+
+ @Override
+ public void ping(RpcController controller,
+ TajoIdProtos.QueryUnitAttemptIdProto attemptId,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ done.run(TajoWorker.TRUE_PROTO);
+ }
+
+ @Override
+ public void executeExecutionBlock(RpcController controller,
+ TajoWorkerProtocol.RunExecutionBlockRequestProto request,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ workerContext.getWorkerSystemMetrics().counter("query", "executedExecutionBlocksNum").inc();
+ try {
+ String[] params = new String[7];
+ params[0] = "standby"; //mode(never used)
+ params[1] = request.getExecutionBlockId();
+ // NodeId has a form of hostname:port.
+ params[2] = request.getNodeId();
+ params[3] = request.getContainerId();
+
+ // QueryMaster's address
+ params[4] = request.getQueryMasterHost();
+ params[5] = String.valueOf(request.getQueryMasterPort());
+ params[6] = request.getQueryOutputPath();
+ workerContext.getTaskRunnerManager().startTask(params);
+ done.run(TajoWorker.TRUE_PROTO);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ done.run(TajoWorker.FALSE_PROTO);
+ }
+ }
+
+ @Override
+ public void killTaskAttempt(RpcController controller, TajoIdProtos.QueryUnitAttemptIdProto request,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ workerContext.getTaskRunnerManager().findTaskByQueryUnitAttemptId(new QueryUnitAttemptId(request)).kill();
+ done.run(TajoWorker.TRUE_PROTO);
+ }
+
+ @Override
+ public void cleanup(RpcController controller, TajoIdProtos.QueryIdProto request,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ workerContext.cleanup(new QueryId(request).toString());
+ done.run(TajoWorker.TRUE_PROTO);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
new file mode 100644
index 0000000..30f56ee
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -0,0 +1,762 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.planner.logical.SortNode;
+import org.apache.tajo.engine.planner.physical.PhysicalExec;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.engine.query.QueryUnitRequest;
+import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
+import org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcChannelFactory;
+import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.ApplicationIdUtils;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.text.NumberFormat;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
+public class Task {
+ private static final Log LOG = LogFactory.getLog(Task.class);
+ private static final float FETCHER_PROGRESS = 0.5f;
+
+ private final TajoConf systemConf;
+ private final QueryContext queryContext;
+ private final FileSystem localFS;
+ private TaskRunner.TaskRunnerContext taskRunnerContext;
+ private final QueryMasterProtocolService.Interface masterProxy;
+ private final LocalDirAllocator lDirAllocator;
+ private final QueryUnitAttemptId taskId;
+
+ private final Path taskDir;
+ private final QueryUnitRequest request;
+ private TaskAttemptContext context;
+ private List<Fetcher> fetcherRunners;
+ private LogicalNode plan;
+ private final Map<String, TableDesc> descs = Maps.newHashMap();
+ private PhysicalExec executor;
+ private boolean interQuery;
+ private boolean killed = false;
+ private boolean aborted = false;
+ private boolean stopped = false;
+ private final Reporter reporter;
+ private Path inputTableBaseDir;
+
+ private static int completedTasksNum = 0;
+ private static int succeededTasksNum = 0;
+ private static int killedTasksNum = 0;
+ private static int failedTasksNum = 0;
+
+ private long startTime;
+ private long finishTime;
+
+ private final TableStats inputStats;
+
+ // TODO - to be refactored
+ private ShuffleType shuffleType = null;
+ private Schema finalSchema = null;
+ private TupleComparator sortComp = null;
+ private ClientSocketChannelFactory channelFactory = null;
+
+ static final String OUTPUT_FILE_PREFIX="part-";
+ static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SUBQUERY =
+ new ThreadLocal<NumberFormat>() {
+ @Override
+ public NumberFormat initialValue() {
+ NumberFormat fmt = NumberFormat.getInstance();
+ fmt.setGroupingUsed(false);
+ fmt.setMinimumIntegerDigits(2);
+ return fmt;
+ }
+ };
+ static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_TASK =
+ new ThreadLocal<NumberFormat>() {
+ @Override
+ public NumberFormat initialValue() {
+ NumberFormat fmt = NumberFormat.getInstance();
+ fmt.setGroupingUsed(false);
+ fmt.setMinimumIntegerDigits(6);
+ return fmt;
+ }
+ };
+
+ public Task(QueryUnitAttemptId taskId,
+ final TaskRunner.TaskRunnerContext worker,
+ final QueryMasterProtocolService.Interface masterProxy,
+ final QueryUnitRequest request) throws IOException {
+ this.request = request;
+ this.taskId = taskId;
+
+ this.systemConf = worker.getConf();
+ this.queryContext = request.getQueryContext();
+ this.taskRunnerContext = worker;
+ this.masterProxy = masterProxy;
+ this.localFS = worker.getLocalFS();
+ this.lDirAllocator = worker.getLocalDirAllocator();
+ this.taskDir = StorageUtil.concatPath(taskRunnerContext.getBaseDir(),
+ taskId.getQueryUnitId().getId() + "_" + taskId.getId());
+
+ this.context = new TaskAttemptContext(systemConf, taskId,
+ request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir);
+ this.context.setDataChannel(request.getDataChannel());
+ this.context.setEnforcer(request.getEnforcer());
+ this.inputStats = new TableStats();
+
+ this.reporter = new Reporter(taskId, masterProxy);
+ this.reporter.startCommunicationThread();
+
+ plan = CoreGsonHelper.fromJson(request.getSerializedData(), LogicalNode.class);
+ LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
+ for (LogicalNode node : scanNode) {
+ ScanNode scan = (ScanNode)node;
+ descs.put(scan.getCanonicalName(), scan.getTableDesc());
+ }
+
+ interQuery = request.getProto().getInterQuery();
+ if (interQuery) {
+ context.setInterQuery();
+ this.shuffleType = context.getDataChannel().getShuffleType();
+
+ if (shuffleType == ShuffleType.RANGE_SHUFFLE) {
+ SortNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT);
+ this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
+ this.sortComp = new TupleComparator(finalSchema, sortNode.getSortKeys());
+ }
+ } else {
+ // The final result of a task will be written in a file named part-ss-nnnnnnn,
+ // where ss is the subquery id associated with this task, and nnnnnn is the task id.
+ Path outFilePath = StorageUtil.concatPath(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME,
+ OUTPUT_FILE_PREFIX +
+ OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskId.getQueryUnitId().getExecutionBlockId().getId()) + "-" +
+ OUTPUT_FILE_FORMAT_TASK.get().format(taskId.getQueryUnitId().getId()));
+ LOG.info("Output File Path: " + outFilePath);
+ context.setOutputPath(outFilePath);
+ }
+
+ context.setState(TaskAttemptState.TA_PENDING);
+ LOG.info("==================================");
+ LOG.info("* Subquery " + request.getId() + " is initialized");
+ LOG.info("* InterQuery: " + interQuery
+ + (interQuery ? ", Use " + this.shuffleType + " shuffle":""));
+
+ LOG.info("* Fragments (num: " + request.getFragments().size() + ")");
+ LOG.info("* Fetches (total:" + request.getFetches().size() + ") :");
+ for (Fetch f : request.getFetches()) {
+ LOG.info("Table Id: " + f.getName() + ", url: " + f.getUrls());
+ }
+ LOG.info("* Local task dir: " + taskDir);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("* plan:\n");
+ LOG.debug(plan.toString());
+ }
+ LOG.info("==================================");
+ }
+
+ public void init() throws IOException {
+ // initialize a task temporal dir
+ localFS.mkdirs(taskDir);
+
+ if (request.getFetches().size() > 0) {
+ inputTableBaseDir = localFS.makeQualified(
+ lDirAllocator.getLocalPathForWrite(
+ getTaskAttemptDir(context.getTaskId()).toString(), systemConf));
+ localFS.mkdirs(inputTableBaseDir);
+ Path tableDir;
+ for (String inputTable : context.getInputTables()) {
+ tableDir = new Path(inputTableBaseDir, inputTable);
+ if (!localFS.exists(tableDir)) {
+ LOG.info("the directory is created " + tableDir.toUri());
+ localFS.mkdirs(tableDir);
+ }
+ }
+ }
+ // for localizing the intermediate data
+ localize(request);
+ }
+
+ public QueryUnitAttemptId getTaskId() {
+ return taskId;
+ }
+
+ public static Log getLog() {
+ return LOG;
+ }
+
+ public void localize(QueryUnitRequest request) throws IOException {
+ fetcherRunners = getFetchRunners(context, request.getFetches());
+ }
+
+ public QueryUnitAttemptId getId() {
+ return context.getTaskId();
+ }
+
+ public TaskAttemptState getStatus() {
+ return context.getState();
+ }
+
+ public String toString() {
+ return "queryId: " + this.getId() + " status: " + this.getStatus();
+ }
+
+ public void setState(TaskAttemptState status) {
+ context.setState(status);
+ }
+
+ public TaskAttemptContext getContext() {
+ return context;
+ }
+
+ public boolean hasFetchPhase() {
+ return fetcherRunners.size() > 0;
+ }
+
+ public void fetch() {
+ for (Fetcher f : fetcherRunners) {
+ taskRunnerContext.getFetchLauncher().submit(new FetchRunner(context, f));
+ }
+ }
+
+ public void kill() {
+ killed = true;
+ context.stop();
+ context.setState(TaskAttemptState.TA_KILLED);
+ releaseChannelFactory();
+ }
+
+ public void abort() {
+ aborted = true;
+ context.stop();
+ releaseChannelFactory();
+ }
+
+ public void cleanUp() {
+ // remove itself from worker
+ if (context.getState() == TaskAttemptState.TA_SUCCEEDED) {
+ try {
+ localFS.delete(context.getWorkDir(), true);
+ synchronized (taskRunnerContext.getTasks()) {
+ taskRunnerContext.getTasks().remove(this.getId());
+ }
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ } else {
+ LOG.error("QueryUnitAttemptId: " + context.getTaskId() + " status: " + context.getState());
+ }
+ }
+
+ public TaskStatusProto getReport() {
+ TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
+ builder.setWorkerName(taskRunnerContext.getNodeId());
+ builder.setId(context.getTaskId().getProto())
+ .setProgress(context.getProgress())
+ .setState(context.getState());
+
+ builder.setInputStats(reloadInputStats());
+
+ if (context.getResultStats() != null) {
+ builder.setResultStats(context.getResultStats().getProto());
+ }
+ return builder.build();
+ }
+
+ private CatalogProtos.TableStatsProto reloadInputStats() {
+ synchronized(inputStats) {
+ if (this.executor == null) {
+ return inputStats.getProto();
+ }
+
+ TableStats executorInputStats = this.executor.getInputStats();
+
+ if (executorInputStats != null) {
+ inputStats.setValues(executorInputStats);
+ }
+ return inputStats.getProto();
+ }
+ }
+
+ private TaskCompletionReport getTaskCompletionReport() {
+ TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder();
+ builder.setId(context.getTaskId().getProto());
+
+ builder.setInputStats(reloadInputStats());
+
+ if (context.hasResultStats()) {
+ builder.setResultStats(context.getResultStats().getProto());
+ } else {
+ builder.setResultStats(new TableStats().getProto());
+ }
+
+ Iterator<Entry<Integer,String>> it = context.getShuffleFileOutputs();
+ if (it.hasNext()) {
+ do {
+ Entry<Integer,String> entry = it.next();
+ ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder();
+ part.setPartId(entry.getKey());
+ builder.addShuffleFileOutputs(part.build());
+ } while (it.hasNext());
+ }
+
+ return builder.build();
+ }
+
+ private void waitForFetch() throws InterruptedException, IOException {
+ context.getFetchLatch().await();
+ LOG.info(context.getTaskId() + " All fetches are done!");
+ Collection<String> inputs = Lists.newArrayList(context.getInputTables());
+ for (String inputTable: inputs) {
+ File tableDir = new File(context.getFetchIn(), inputTable);
+ FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta());
+ context.updateAssignedFragments(inputTable, frags);
+ }
+ releaseChannelFactory();
+ }
+
+ public void run() {
+ startTime = System.currentTimeMillis();
+ String errorMessage = null;
+ try {
+ context.setState(TaskAttemptState.TA_RUNNING);
+
+ if (context.hasFetchPhase()) {
+ // If the fetch is still in progress, the query unit must wait for
+ // complete.
+ waitForFetch();
+ context.setFetcherProgress(FETCHER_PROGRESS);
+ context.setProgress(FETCHER_PROGRESS);
+ }
+
+ if (context.getFragmentSize() > 0) {
+ this.executor = taskRunnerContext.getTQueryEngine().
+ createPlan(context, plan);
+ this.executor.init();
+ while(!killed && executor.next() != null) {
+ }
+ this.executor.close();
+ reloadInputStats();
+ this.executor = null;
+ }
+ } catch (Exception e) {
+ // errorMessage will be sent to master.
+ errorMessage = ExceptionUtils.getStackTrace(e);
+ LOG.error(errorMessage);
+ aborted = true;
+ } finally {
+ context.setProgress(1.0f);
+ stopped = true;
+ completedTasksNum++;
+
+ if (killed || aborted) {
+ context.setExecutorProgress(0.0f);
+ context.setProgress(0.0f);
+ if(killed) {
+ context.setState(TaskAttemptState.TA_KILLED);
+ masterProxy.statusUpdate(null, getReport(), NullCallback.get());
+ killedTasksNum++;
+ } else {
+ context.setState(TaskAttemptState.TA_FAILED);
+ TaskFatalErrorReport.Builder errorBuilder =
+ TaskFatalErrorReport.newBuilder()
+ .setId(getId().getProto());
+ if (errorMessage != null) {
+ errorBuilder.setErrorMessage(errorMessage);
+ }
+
+ masterProxy.fatalError(null, errorBuilder.build(), NullCallback.get());
+ failedTasksNum++;
+ }
+
+ // stopping the status report
+ try {
+ reporter.stopCommunicationThread();
+ } catch (InterruptedException e) {
+ LOG.warn(e);
+ }
+
+ } else {
+ // if successful
+ context.setProgress(1.0f);
+ context.setState(TaskAttemptState.TA_SUCCEEDED);
+
+ // stopping the status report
+ try {
+ reporter.stopCommunicationThread();
+ } catch (InterruptedException e) {
+ LOG.warn(e);
+ }
+
+ TaskCompletionReport report = getTaskCompletionReport();
+ masterProxy.done(null, report, NullCallback.get());
+ succeededTasksNum++;
+ }
+
+ finishTime = System.currentTimeMillis();
+
+ cleanupTask();
+ LOG.info("Task Counter - total:" + completedTasksNum + ", succeeded: " + succeededTasksNum
+ + ", killed: " + killedTasksNum + ", failed: " + failedTasksNum);
+ }
+ }
+
+ public void cleanupTask() {
+ taskRunnerContext.addTaskHistory(getId(), getTaskHistory());
+ taskRunnerContext.getTasks().remove(getId());
+ taskRunnerContext = null;
+
+ fetcherRunners.clear();
+ executor = null;
+ plan = null;
+ context = null;
+ releaseChannelFactory();
+ }
+
+ public TaskHistory getTaskHistory() {
+ TaskHistory taskHistory = new TaskHistory();
+ taskHistory.setStartTime(startTime);
+ taskHistory.setFinishTime(finishTime);
+ if (context.getOutputPath() != null) {
+ taskHistory.setOutputPath(context.getOutputPath().toString());
+ }
+
+ if (context.getWorkDir() != null) {
+ taskHistory.setWorkingPath(context.getWorkDir().toString());
+ }
+
+ try {
+ taskHistory.setStatus(getStatus().toString());
+ taskHistory.setProgress(context.getProgress());
+
+ taskHistory.setInputStats(new TableStats(reloadInputStats()));
+ if (context.getResultStats() != null) {
+ taskHistory.setOutputStats((TableStats)context.getResultStats().clone());
+ }
+
+ if (hasFetchPhase()) {
+ Map<URI, TaskHistory.FetcherHistory> fetcherHistories = new HashMap<URI, TaskHistory.FetcherHistory>();
+
+ for(Fetcher eachFetcher: fetcherRunners) {
+ TaskHistory.FetcherHistory fetcherHistory = new TaskHistory.FetcherHistory();
+ fetcherHistory.setStartTime(eachFetcher.getStartTime());
+ fetcherHistory.setFinishTime(eachFetcher.getFinishTime());
+ fetcherHistory.setStatus(eachFetcher.getStatus());
+ fetcherHistory.setUri(eachFetcher.getURI().toString());
+ fetcherHistory.setFileLen(eachFetcher.getFileLen());
+ fetcherHistory.setMessageReceiveCount(eachFetcher.getMessageReceiveCount());
+
+ fetcherHistories.put(eachFetcher.getURI(), fetcherHistory);
+ }
+
+ taskHistory.setFetchers(fetcherHistories);
+ }
+ } catch (Exception e) {
+ taskHistory.setStatus(StringUtils.stringifyException(e));
+ e.printStackTrace();
+ }
+
+ return taskHistory;
+ }
+
+ public int hashCode() {
+ return context.hashCode();
+ }
+
+ public boolean equals(Object obj) {
+ if (obj instanceof Task) {
+ Task other = (Task) obj;
+ return this.context.equals(other.context);
+ }
+ return false;
+ }
+
+ private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta)
+ throws IOException {
+ Configuration c = new Configuration(systemConf);
+ c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///");
+ FileSystem fs = FileSystem.get(c);
+ Path tablePath = new Path(file.getAbsolutePath());
+
+ List<FileFragment> listTablets = new ArrayList<FileFragment>();
+ FileFragment tablet;
+
+ FileStatus[] fileLists = fs.listStatus(tablePath);
+ for (FileStatus f : fileLists) {
+ if (f.getLen() == 0) {
+ continue;
+ }
+ tablet = new FileFragment(name, f.getPath(), 0l, f.getLen());
+ listTablets.add(tablet);
+ }
+
+ FileFragment[] tablets = new FileFragment[listTablets.size()];
+ listTablets.toArray(tablets);
+
+ return tablets;
+ }
+
+ private class FetchRunner implements Runnable {
+ private final TaskAttemptContext ctx;
+ private final Fetcher fetcher;
+
+ public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) {
+ this.ctx = ctx;
+ this.fetcher = fetcher;
+ }
+
+ @Override
+ public void run() {
+ int retryNum = 0;
+ int maxRetryNum = 5;
+ int retryWaitTime = 1000;
+
+ try { // for releasing fetch latch
+ while(retryNum < maxRetryNum) {
+ if (retryNum > 0) {
+ try {
+ Thread.sleep(retryWaitTime);
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+ LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")");
+ }
+ try {
+ File fetched = fetcher.get();
+ if (fetched != null) {
+ break;
+ }
+ } catch (IOException e) {
+ LOG.error("Fetch failed: " + fetcher.getURI(), e);
+ }
+ retryNum++;
+ }
+ } finally {
+ fetcherFinished(ctx);
+ }
+
+ if (retryNum == maxRetryNum) {
+ LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")");
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public static float adjustFetchProcess(int totalFetcher, int remainFetcher) {
+ return ((float)(totalFetcher - remainFetcher)) / (float)totalFetcher * FETCHER_PROGRESS;
+ }
+
+ private synchronized void fetcherFinished(TaskAttemptContext ctx) {
+ int fetcherSize = fetcherRunners.size();
+ if(fetcherSize == 0) {
+ return;
+ }
+ try {
+ int numRunningFetcher = (int)(ctx.getFetchLatch().getCount()) - 1;
+
+ if (numRunningFetcher == 0) {
+ context.setProgress(FETCHER_PROGRESS);
+ } else {
+ context.setProgress(adjustFetchProcess(fetcherSize, numRunningFetcher));
+ }
+ } finally {
+ ctx.getFetchLatch().countDown();
+ }
+ }
+
+ private void releaseChannelFactory(){
+ if(channelFactory != null) {
+ channelFactory.shutdown();
+ channelFactory.releaseExternalResources();
+ channelFactory = null;
+ }
+ }
+
+ private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
+ List<Fetch> fetches) throws IOException {
+
+ if (fetches.size() > 0) {
+
+ releaseChannelFactory();
+
+
+ int workerNum = ctx.getConf().getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM);
+ channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", workerNum);
+ Path inputDir = lDirAllocator.
+ getLocalPathToRead(
+ getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
+ File storeDir;
+
+ int i = 0;
+ File storeFile;
+ List<Fetcher> runnerList = Lists.newArrayList();
+ for (Fetch f : fetches) {
+ storeDir = new File(inputDir.toString(), f.getName());
+ if (!storeDir.exists()) {
+ storeDir.mkdirs();
+ }
+ storeFile = new File(storeDir, "in_" + i);
+ Fetcher fetcher = new Fetcher(URI.create(f.getUrls()), storeFile, channelFactory);
+ runnerList.add(fetcher);
+ i++;
+ }
+ ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString()));
+ return runnerList;
+ } else {
+ return Lists.newArrayList();
+ }
+ }
+
+ protected class Reporter {
+ private QueryMasterProtocolService.Interface masterStub;
+ private Thread pingThread;
+ private AtomicBoolean stop = new AtomicBoolean(false);
+ private static final int PROGRESS_INTERVAL = 3000;
+ private QueryUnitAttemptId taskId;
+
+ public Reporter(QueryUnitAttemptId taskId, QueryMasterProtocolService.Interface masterStub) {
+ this.taskId = taskId;
+ this.masterStub = masterStub;
+ }
+
+ Runnable createReporterThread() {
+
+ return new Runnable() {
+ final int MAX_RETRIES = 3;
+ int remainingRetries = MAX_RETRIES;
+ @Override
+ public void run() {
+ while (!stop.get() && !stopped) {
+ try {
+ if(executor != null && context.getProgress() < 1.0f) {
+ float progress = executor.getProgress();
+ context.setExecutorProgress(progress);
+ }
+ } catch (Throwable t) {
+ LOG.error("Get progress error: " + t.getMessage(), t);
+ }
+
+ try {
+ if (context.isPorgressChanged()) {
+ masterStub.statusUpdate(null, getReport(), NullCallback.get());
+ } else {
+ masterStub.ping(null, taskId.getProto(), NullCallback.get());
+ }
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ remainingRetries -=1;
+ if (remainingRetries == 0) {
+ ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
+ LOG.warn("Last retry, exiting ");
+ throw new RuntimeException(t);
+ }
+ } finally {
+ if (remainingRetries > 0) {
+ synchronized (pingThread) {
+ try {
+ pingThread.wait(PROGRESS_INTERVAL);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+ }
+ }
+ };
+ }
+
+ public void startCommunicationThread() {
+ if (pingThread == null) {
+ pingThread = new Thread(createReporterThread());
+ pingThread.setName("communication thread");
+ pingThread.start();
+ }
+ }
+
+ public void stopCommunicationThread() throws InterruptedException {
+ if(stop.getAndSet(true)){
+ return;
+ }
+
+ if (pingThread != null) {
+ // Intent of the lock is to not send an interupt in the middle of an
+ // umbilical.ping or umbilical.statusUpdate
+ synchronized(pingThread) {
+ //Interrupt if sleeping. Otherwise wait for the RPC call to return.
+ pingThread.notifyAll();
+ }
+ }
+ }
+ }
+
+ public static final String FILECACHE = "filecache";
+ public static final String APPCACHE = "appcache";
+ public static final String USERCACHE = "usercache";
+
+ String fileCache;
+ public String getFileCacheDir() {
+ fileCache = USERCACHE + "/" + "hyunsik" + "/" + APPCACHE + "/" +
+ ConverterUtils.toString(ApplicationIdUtils.queryIdToAppId(taskId.getQueryUnitId().getExecutionBlockId().getQueryId())) +
+ "/" + "output";
+ return fileCache;
+ }
+
+ public static Path getTaskAttemptDir(QueryUnitAttemptId quid) {
+ Path workDir =
+ StorageUtil.concatPath(
+ quid.getQueryUnitId().getExecutionBlockId().getQueryId().toString(),
+ "in",
+ quid.getQueryUnitId().getExecutionBlockId().toString(),
+ String.valueOf(quid.getQueryUnitId().getId()),
+ String.valueOf(quid.getId()));
+ return workDir;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
new file mode 100644
index 0000000..6f3281c
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+
+import java.io.File;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
+
+/**
+ * Contains the information about executing subquery.
+ */
+public class TaskAttemptContext {
+ private static final Log LOG = LogFactory.getLog(TaskAttemptContext.class);
+ private final TajoConf conf;
+ private final Map<String, List<FragmentProto>> fragmentMap = Maps.newHashMap();
+
+ private TaskAttemptState state;
+ private TableStats resultStats;
+ private QueryUnitAttemptId queryId;
+ private final Path workDir;
+ private boolean needFetch = false;
+ private CountDownLatch doneFetchPhaseSignal;
+ private float progress = 0.0f;
+ private float fetcherProgress = 0.0f;
+ private AtomicBoolean progressChanged = new AtomicBoolean(false);
+
+ /** a map of shuffled file outputs */
+ private Map<Integer, String> shuffleFileOutputs;
+ private File fetchIn;
+ private boolean stopped = false;
+ private boolean interQuery = false;
+ private Path outputPath;
+ private DataChannel dataChannel;
+ private Enforcer enforcer;
+
+ public TaskAttemptContext(TajoConf conf, final QueryUnitAttemptId queryId,
+ final FragmentProto[] fragments,
+ final Path workDir) {
+ this.conf = conf;
+ this.queryId = queryId;
+
+ for(FragmentProto t : fragments) {
+ if (fragmentMap.containsKey(t.getId())) {
+ fragmentMap.get(t.getId()).add(t);
+ } else {
+ List<FragmentProto> frags = new ArrayList<FragmentProto>();
+ frags.add(t);
+ fragmentMap.put(t.getId(), frags);
+ }
+ }
+
+ this.workDir = workDir;
+ this.shuffleFileOutputs = Maps.newHashMap();
+
+ state = TaskAttemptState.TA_PENDING;
+ }
+
+ @VisibleForTesting
+ public TaskAttemptContext(TajoConf conf, final QueryUnitAttemptId queryId,
+ final Fragment [] fragments, final Path workDir) {
+ this(conf, queryId, FragmentConvertor.toFragmentProtoArray(fragments), workDir);
+ }
+
+ public TajoConf getConf() {
+ return this.conf;
+ }
+
+ public TaskAttemptState getState() {
+ return this.state;
+ }
+
+ public void setState(TaskAttemptState state) {
+ this.state = state;
+ LOG.info("Query status of " + getTaskId() + " is changed to " + state);
+ }
+
+ public void setDataChannel(DataChannel dataChannel) {
+ this.dataChannel = dataChannel;
+ }
+
+ public DataChannel getDataChannel() {
+ return dataChannel;
+ }
+
+ public void setEnforcer(Enforcer enforcer) {
+ this.enforcer = enforcer;
+ }
+
+ public Enforcer getEnforcer() {
+ return this.enforcer;
+ }
+
+ public boolean hasResultStats() {
+ return resultStats != null;
+ }
+
+ public void setResultStats(TableStats stats) {
+ this.resultStats = stats;
+ }
+
+ public TableStats getResultStats() {
+ return this.resultStats;
+ }
+
+ public boolean isStopped() {
+ return this.stopped;
+ }
+
+ public void setInterQuery() {
+ this.interQuery = true;
+ }
+
+ public void setOutputPath(Path outputPath) {
+ this.outputPath = outputPath;
+ }
+
+ public Path getOutputPath() {
+ return this.outputPath;
+ }
+
+ public boolean isInterQuery() {
+ return this.interQuery;
+ }
+
+ public void stop() {
+ this.stopped = true;
+ }
+
+ public void addFetchPhase(int count, File fetchIn) {
+ this.needFetch = true;
+ this.doneFetchPhaseSignal = new CountDownLatch(count);
+ this.fetchIn = fetchIn;
+ }
+
+ public File getFetchIn() {
+ return this.fetchIn;
+ }
+
+ public boolean hasFetchPhase() {
+ return this.needFetch;
+ }
+
+ public CountDownLatch getFetchLatch() {
+ return doneFetchPhaseSignal;
+ }
+
+ public void addShuffleFileOutput(int partId, String fileName) {
+ shuffleFileOutputs.put(partId, fileName);
+ }
+
+ public Iterator<Entry<Integer,String>> getShuffleFileOutputs() {
+ return shuffleFileOutputs.entrySet().iterator();
+ }
+
+ public void updateAssignedFragments(String tableId, Fragment[] fragments) {
+ fragmentMap.remove(tableId);
+ for(Fragment t : fragments) {
+ if (fragmentMap.containsKey(t.getTableName())) {
+ fragmentMap.get(t.getTableName()).add(t.getProto());
+ } else {
+ List<FragmentProto> frags = new ArrayList<FragmentProto>();
+ frags.add(t.getProto());
+ fragmentMap.put(t.getTableName(), frags);
+ }
+ }
+ }
+
+ public Path getWorkDir() {
+ return this.workDir;
+ }
+
+ public QueryUnitAttemptId getTaskId() {
+ return this.queryId;
+ }
+
+ public float getProgress() {
+ return this.progress;
+ }
+
+ public void setProgress(float progress) {
+ float previousProgress = this.progress;
+ this.progress = progress;
+ progressChanged.set(previousProgress != progress);
+ }
+
+ public boolean isPorgressChanged() {
+ return progressChanged.get();
+ }
+ public void setExecutorProgress(float executorProgress) {
+ float adjustProgress = executorProgress * (1 - fetcherProgress);
+ setProgress(fetcherProgress + adjustProgress);
+ }
+
+ public void setFetcherProgress(float fetcherProgress) {
+ this.fetcherProgress = fetcherProgress;
+ }
+
+ public FragmentProto getTable(String id) {
+ if (fragmentMap.get(id) == null) {
+ //for empty table
+ return null;
+ }
+ return fragmentMap.get(id).get(0);
+ }
+
+ public int getFragmentSize() {
+ return fragmentMap.size();
+ }
+
+ public Collection<String> getInputTables() {
+ return fragmentMap.keySet();
+ }
+
+ public FragmentProto [] getTables(String id) {
+ if (fragmentMap.get(id) == null) {
+ //for empty table
+ return null;
+ }
+ return fragmentMap.get(id).toArray(new FragmentProto[fragmentMap.get(id).size()]);
+ }
+
+ public int hashCode() {
+ return Objects.hashCode(queryId);
+ }
+
+ public boolean equals(Object obj) {
+ if (obj instanceof TaskAttemptContext) {
+ TaskAttemptContext other = (TaskAttemptContext) obj;
+ return queryId.equals(other.getTaskId());
+ } else {
+ return false;
+ }
+ }
+}
\ No newline at end of file