You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/08/14 08:48:03 UTC
[5/8] TAJO-91: Launch QueryMaster on NodeManager per query.
(hyoungjunkim via hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerLauncherImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerLauncherImpl.java
index 8e0abd0..d8ddb46 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerLauncherImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerLauncherImpl.java
@@ -21,76 +21,45 @@ package org.apache.tajo.master;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.ContainerManager;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.ProtoUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.tajo.QueryConf;
import org.apache.tajo.SubQueryId;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.master.QueryMaster.QueryContext;
import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
import org.apache.tajo.master.event.QueryEvent;
import org.apache.tajo.master.event.QueryEventType;
-import org.apache.tajo.pullserver.PullServerAuxService;
+import org.apache.tajo.master.querymaster.QueryMaster;
+import org.apache.tajo.master.querymaster.QueryMaster.QueryContext;
+import org.apache.tajo.worker.TaskRunner;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
-import java.util.*;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunnerLauncher {
/** Class Logger */
private static final Log LOG = LogFactory.getLog(TaskRunnerLauncherImpl.class);
- private final YarnRPC yarnRPC;
- private final static RecordFactory recordFactory =
- RecordFactoryProvider.getRecordFactory(null);
private QueryContext context;
- private final String taskListenerHost;
- private final int taskListenerPort;
+ private final String queryMasterHost;
+ private final int queryMasterPort;
// For ContainerLauncherSpec
- private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
- private static String initialClasspath = null;
- private static final Object classpathLock = new Object();
- private Object commonContainerSpecLock = new Object();
private ContainerLaunchContext commonContainerSpec = null;
- final public static FsPermission QUERYCONF_FILE_PERMISSION =
- FsPermission.createImmutable((short) 0644); // rw-r--r--
-
/** for launching TaskRunners in parallel */
private final ExecutorService executorService;
public TaskRunnerLauncherImpl(QueryContext context) {
super(TaskRunnerLauncherImpl.class.getName());
this.context = context;
- taskListenerHost = context.getTaskListener().getHostName();
- taskListenerPort = context.getTaskListener().getPort();
- yarnRPC = context.getYarnRPC();
+ queryMasterHost = context.getQueryMasterServiceAddress().getHostName();
+ queryMasterPort = context.getQueryMasterServiceAddress().getPort();
executorService = Executors.newFixedThreadPool(
context.getConf().getIntVar(TajoConf.ConfVars.AM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
}
@@ -100,7 +69,22 @@ public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunne
}
public void stop() {
- executorService.shutdown();
+ executorService.shutdownNow();
+
+ while(!executorService.isTerminated()) {
+ LOG.info("executorService.isTerminated:" + executorService.isTerminated() + "," + executorService.isShutdown());
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ Map<ContainerId, ContainerProxy> containers = context.getContainers();
+ for(ContainerProxy eachProxy: containers.values()) {
+ try {
+ eachProxy.kill();
+ } catch (Exception e) {
+ }
+ }
super.stop();
}
@@ -114,427 +98,82 @@ public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunne
}
private void launchTaskRunners(SubQueryId subQueryId, Collection<Container> containers) {
+ commonContainerSpec = ContainerProxy.createCommonContainerLaunchContext(getConfig());
for (Container container : containers) {
- final ContainerProxy proxy = new ContainerProxy(container, subQueryId);
- executorService.submit(new LaunchRunner(proxy));
+ final ContainerProxy proxy =
+ new TaskRunnerContainerProxy(context, getConfig(), context.getYarnRPC(), container, subQueryId);
+ executorService.submit(new LaunchRunner(container.getId(), proxy));
}
}
private class LaunchRunner implements Runnable {
private final ContainerProxy proxy;
- public LaunchRunner(ContainerProxy proxy) {
+ private final ContainerId id;
+
+ public LaunchRunner(ContainerId id, ContainerProxy proxy) {
this.proxy = proxy;
+ this.id = id;
}
@Override
public void run() {
- proxy.launch();
+ proxy.launch(commonContainerSpec);
+ LOG.info("ContainerProxy started:" + id);
}
}
private void killTaskRunners(Collection<Container> containers) {
for (Container container : containers) {
final ContainerProxy proxy = context.getContainer(container.getId());
- executorService.submit(new KillRunner(proxy));
+ executorService.submit(new KillRunner(container.getId(), proxy));
}
}
private class KillRunner implements Runnable {
private final ContainerProxy proxy;
- public KillRunner(ContainerProxy proxy) {
+ private final ContainerId id;
+ public KillRunner(ContainerId id, ContainerProxy proxy) {
+ this.id = id;
this.proxy = proxy;
}
@Override
public void run() {
proxy.kill();
+ LOG.info("ContainerProxy killed:" + id);
}
}
-
- /**
- * Lock this on initialClasspath so that there is only one fork in the AM for
- * getting the initial class-path. TODO: We already construct
- * a parent CLC and use it for all the containers, so this should go away
- * once the mr-generated-classpath stuff is gone.
- */
- private static String getInitialClasspath(Configuration conf) {
- synchronized (classpathLock) {
- if (initialClasspathFlag.get()) {
- return initialClasspath;
- }
- Map<String, String> env = new HashMap<String, String>();
-
- initialClasspath = env.get(Environment.CLASSPATH.name());
- initialClasspathFlag.set(true);
- return initialClasspath;
- }
- }
-
- private ContainerLaunchContext createCommonContainerLaunchContext() {
- TajoConf conf = (TajoConf) getConfig();
-
- ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
- try {
- ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- ////////////////////////////////////////////////////////////////////////////
- // Set the env variables to be setup
- ////////////////////////////////////////////////////////////////////////////
- LOG.info("Set the environment for the application master");
-
- Map<String, String> environment = new HashMap<String, String>();
- //String initialClassPath = getInitialClasspath(conf);
- environment.put(Environment.SHELL.name(), "/bin/bash");
- environment.put(Environment.JAVA_HOME.name(), System.getenv(Environment.JAVA_HOME.name()));
-
- // TODO - to be improved with org.apache.tajo.sh shell script
- Properties prop = System.getProperties();
- if (prop.getProperty("tajo.test", "FALSE").equalsIgnoreCase("TRUE")) {
- environment.put(Environment.CLASSPATH.name(), prop.getProperty(
- "java.class.path", null));
- } else {
- // Add AppMaster.jar location to classpath
- // At some point we should not be required to add
- // the hadoop specific classpaths to the env.
- // It should be provided out of the box.
- // For now setting all required classpaths including
- // the classpath to "." for the application jar
- StringBuilder classPathEnv = new StringBuilder("./");
- //for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) {
- for (String c : YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH) {
- classPathEnv.append(':');
- classPathEnv.append(c.trim());
- }
-
- classPathEnv.append(":" + System.getenv("TAJO_BASE_CLASSPATH"));
- classPathEnv.append(":./log4j.properties:./*");
- environment.put("HADOOP_HOME", System.getenv("HADOOP_HOME"));
- environment.put(
- Environment.HADOOP_COMMON_HOME.name(),
- System.getenv("HADOOP_HOME"));
- environment.put(
- Environment.HADOOP_HDFS_HOME.name(),
- System.getenv("HADOOP_HOME"));
- environment.put(
- Environment.HADOOP_YARN_HOME.name(),
- System.getenv("HADOOP_HOME"));
- environment.put("TAJO_BASE_CLASSPATH", System.getenv("TAJO_BASE_CLASSPATH"));
- environment.put(Environment.CLASSPATH.name(), classPathEnv.toString());
- }
-
- ctx.setEnvironment(environment);
-
- ////////////////////////////////////////////////////////////////////////////
- // Set the local resources
- ////////////////////////////////////////////////////////////////////////////
- Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
- FileSystem fs = null;
-
-
- LOG.info("defaultFS: " + conf.get("fs.default.name"));
- LOG.info("defaultFS: " + conf.get("fs.defaultFS"));
- try {
- fs = FileSystem.get(conf);
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- FileContext fsCtx = null;
- try {
- fsCtx = FileContext.getFileContext(getConfig());
- } catch (UnsupportedFileSystemException e) {
- e.printStackTrace();
- }
-
- LOG.info("Writing a QueryConf to HDFS and add to local environment");
- Path queryConfPath = new Path(fs.getHomeDirectory(), QueryConf.FILENAME);
- try {
- writeConf(conf, queryConfPath);
-
- LocalResource queryConfSrc = createApplicationResource(fsCtx,
- queryConfPath, LocalResourceType.FILE);
- localResources.put(QueryConf.FILENAME, queryConfSrc);
-
- ctx.setLocalResources(localResources);
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- // Add shuffle token
- Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
- try {
- //LOG.info("Putting shuffle token in serviceData");
- serviceData.put(PullServerAuxService.PULLSERVER_SERVICEID,
- PullServerAuxService.serializeMetaData(0));
- } catch (IOException ioe) {
- LOG.error(ioe);
- }
- ctx.setServiceData(serviceData);
-
- return ctx;
- }
-
- protected ContainerManager getCMProxy(ContainerId containerID,
- final String containerManagerBindAddr,
- ContainerToken containerToken)
- throws IOException {
- String [] hosts = containerManagerBindAddr.split(":");
- final InetSocketAddress cmAddr =
- new InetSocketAddress(hosts[0], Integer.parseInt(hosts[1]));
- UserGroupInformation user = UserGroupInformation.getCurrentUser();
-
- if (UserGroupInformation.isSecurityEnabled()) {
- Token<ContainerTokenIdentifier> token =
- ProtoUtils.convertFromProtoFormat(containerToken, cmAddr);
- // the user in createRemoteUser in this context has to be ContainerID
- user = UserGroupInformation.createRemoteUser(containerID.toString());
- user.addToken(token);
- }
-
- ContainerManager proxy = user
- .doAs(new PrivilegedAction<ContainerManager>() {
- @Override
- public ContainerManager run() {
- return (ContainerManager) yarnRPC.getProxy(ContainerManager.class,
- cmAddr, getConfig());
- }
- });
- return proxy;
- }
-
- private LocalResource createApplicationResource(FileContext fs,
- Path p, LocalResourceType type)
- throws IOException {
- LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
- FileStatus rsrcStat = fs.getFileStatus(p);
- rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
- .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
- rsrc.setSize(rsrcStat.getLen());
- rsrc.setTimestamp(rsrcStat.getModificationTime());
- rsrc.setType(type);
- rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
- return rsrc;
- }
-
- private void writeConf(Configuration conf, Path queryConfFile)
- throws IOException {
- // Write job file to Tajo's fs
- FileSystem fs = queryConfFile.getFileSystem(conf);
- FSDataOutputStream out =
- FileSystem.create(fs, queryConfFile,
- new FsPermission(QUERYCONF_FILE_PERMISSION));
- try {
- conf.writeXml(out);
- } finally {
- out.close();
- }
- }
-
- private static enum ContainerState {
- PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH
- }
-
- public class ContainerProxy {
- private ContainerState state;
- // store enough information to be able to cleanup the container
- private Container container;
- private ContainerId containerID;
- final private String containerMgrAddress;
- private ContainerToken containerToken;
- private String hostName;
- private int port = -1;
+ public class TaskRunnerContainerProxy extends ContainerProxy {
private final SubQueryId subQueryId;
- public ContainerProxy(Container container, SubQueryId subQueryId) {
- this.state = ContainerState.PREP;
- this.container = container;
- this.containerID = container.getId();
- NodeId nodeId = container.getNodeId();
- this.containerMgrAddress = nodeId.getHost() + ":" + nodeId.getPort();;
- this.containerToken = container.getContainerToken();
+ public TaskRunnerContainerProxy(QueryMaster.QueryContext context, Configuration conf, YarnRPC yarnRPC,
+ Container container, SubQueryId subQueryId) {
+ super(context, conf, yarnRPC, container);
this.subQueryId = subQueryId;
}
- public synchronized boolean isCompletelyDone() {
- return state == ContainerState.DONE || state == ContainerState.FAILED;
- }
-
- @SuppressWarnings("unchecked")
- public synchronized void launch() {
- LOG.info("Launching Container with Id: " + containerID);
- if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
- state = ContainerState.DONE;
- LOG.error("Container (" + containerID + " was killed before it was launched");
- return;
- }
-
- ContainerManager proxy = null;
- try {
-
- proxy = getCMProxy(containerID, containerMgrAddress,
- containerToken);
-
- // Construct the actual Container
- ContainerLaunchContext containerLaunchContext = createContainerLaunchContext();
-
- // Now launch the actual container
- StartContainerRequest startRequest = Records
- .newRecord(StartContainerRequest.class);
- startRequest.setContainerLaunchContext(containerLaunchContext);
- StartContainerResponse response = proxy.startContainer(startRequest);
-
- ByteBuffer portInfo = response
- .getServiceResponse(PullServerAuxService.PULLSERVER_SERVICEID);
-
- if(portInfo != null) {
- port = PullServerAuxService.deserializeMetaData(portInfo);
- }
-
- LOG.info("PullServer port returned by ContainerManager for "
- + containerID + " : " + port);
-
- if(port < 0) {
- this.state = ContainerState.FAILED;
- throw new IllegalStateException("Invalid shuffle port number "
- + port + " returned for " + containerID);
- }
-
- // after launching, send launched event to task attempt to move
- // it from ASSIGNED to RUNNING state
-// context.getEventHandler().handle(new AMContainerEventLaunched(containerID, port));
-
- // this is workaround code
- context.getEventHandler().handle(new QueryEvent(context.getQueryId(), QueryEventType.INIT_COMPLETED));
-
- this.state = ContainerState.RUNNING;
- this.hostName = containerMgrAddress.split(":")[0];
- context.addContainer(containerID, this);
- } catch (Throwable t) {
- String message = "Container launch failed for " + containerID + " : "
- + StringUtils.stringifyException(t);
- this.state = ContainerState.FAILED;
- LOG.error(message);
- } finally {
- if (proxy != null) {
- yarnRPC.stopProxy(proxy, getConfig());
- }
- }
+ @Override
+ protected void containerStarted() {
+ context.getEventHandler().handle(new QueryEvent(context.getQueryId(), QueryEventType.INIT_COMPLETED));
}
- public synchronized void kill() {
-
- if(isCompletelyDone()) {
- return;
- }
- if(this.state == ContainerState.PREP) {
- this.state = ContainerState.KILLED_BEFORE_LAUNCH;
- } else {
- LOG.info("KILLING " + containerID);
-
- ContainerManager proxy = null;
- try {
- proxy = getCMProxy(this.containerID, this.containerMgrAddress,
- this.containerToken);
-
- // kill the remote container if already launched
- StopContainerRequest stopRequest = Records
- .newRecord(StopContainerRequest.class);
- stopRequest.setContainerId(this.containerID);
- proxy.stopContainer(stopRequest);
- // If stopContainer returns without an error, assuming the stop made
- // it over to the NodeManager.
-// context.getEventHandler().handle(
-// new AMContainerEvent(containerID, AMContainerEventType.C_NM_STOP_SENT));
- context.removeContainer(containerID);
- } catch (Throwable t) {
-
- // ignore the cleanup failure
- String message = "cleanup failed for container "
- + this.containerID + " : "
- + StringUtils.stringifyException(t);
-// context.getEventHandler().handle(
-// new AMContainerEventStopFailed(containerID, message));
- LOG.warn(message);
- this.state = ContainerState.DONE;
- return;
- } finally {
- if (proxy != null) {
- yarnRPC.stopProxy(proxy, getConfig());
- }
- }
- this.state = ContainerState.DONE;
- }
+ @Override
+ protected String getId() {
+ return subQueryId.toString();
}
- public ContainerLaunchContext createContainerLaunchContext() {
- synchronized (commonContainerSpecLock) {
- if (commonContainerSpec == null) {
- commonContainerSpec = createCommonContainerLaunchContext();
- }
- }
-
- // Setup environment by cloning from common env.
- Map<String, String> env = commonContainerSpec.getEnvironment();
- Map<String, String> myEnv = new HashMap<String, String>(env.size());
- myEnv.putAll(env);
-
- // Duplicate the ByteBuffers for access by multiple containers.
- Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
- for (Map.Entry<String, ByteBuffer> entry : commonContainerSpec
- .getServiceData().entrySet()) {
- myServiceData.put(entry.getKey(), entry.getValue().duplicate());
- }
-
- ////////////////////////////////////////////////////////////////////////////
- // Set the local resources
- ////////////////////////////////////////////////////////////////////////////
- // Set the necessary command to execute the application master
- Vector<CharSequence> vargs = new Vector<CharSequence>(30);
-
- // Set java executable command
- //LOG.info("Setting up app master command");
- vargs.add("${JAVA_HOME}" + "/bin/java");
- // Set Xmx based on am memory size
- vargs.add("-Xmx2000m");
- // Set Remote Debugging
- //if (!context.getQuery().getSubQuery(event.getSubQueryId()).isLeafQuery()) {
- //vargs.add("-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
- //}
- // Set class name
- vargs.add("org.apache.tajo.worker.TaskRunner");
- vargs.add(taskListenerHost); // tasklistener hostname
- vargs.add(String.valueOf(taskListenerPort)); // tasklistener hostname
- vargs.add(subQueryId.toString()); // subqueryId
- vargs.add(containerMgrAddress); // nodeId
- vargs.add(containerID.toString()); // containerId
-
- vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
- vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
-
- // Get final commmand
- StringBuilder command = new StringBuilder();
- for (CharSequence str : vargs) {
- command.append(str).append(" ");
- }
-
- LOG.info("Completed setting up TaskRunner command " + command.toString());
- List<String> commands = new ArrayList<String>();
- commands.add(command.toString());
-
- return BuilderUtils.newContainerLaunchContext(containerID, commonContainerSpec.getUser(),
- container.getResource(), commonContainerSpec.getLocalResources(), myEnv, commands,
- myServiceData, null, new HashMap<ApplicationAccessType, String>());
+ @Override
+ protected String getRunnerClass() {
+ return TaskRunner.class.getCanonicalName();
}
- public String getHostName() {
- return this.hostName;
- }
+ @Override
+ protected Vector<CharSequence> getTaskParams() {
+ Vector<CharSequence> taskParams = new Vector<CharSequence>();
+ taskParams.add(queryMasterHost); // queryMaster hostname
+ taskParams.add(String.valueOf(queryMasterPort)); // queryMaster port
- public int getPullServerPort() {
- return this.port;
+ return taskParams;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerListener.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerListener.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerListener.java
deleted file mode 100644
index ce16897..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerListener.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
-import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
-import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.TajoIdProtos.QueryUnitAttemptIdProto;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.MasterWorkerProtos.QueryUnitRequestProto;
-import org.apache.tajo.engine.MasterWorkerProtos.TaskCompletionReport;
-import org.apache.tajo.engine.MasterWorkerProtos.TaskFatalErrorReport;
-import org.apache.tajo.engine.MasterWorkerProtos.TaskStatusProto;
-import org.apache.tajo.ipc.MasterWorkerProtocol;
-import org.apache.tajo.ipc.MasterWorkerProtocol.MasterWorkerProtocolService;
-import org.apache.tajo.master.QueryMaster.QueryContext;
-import org.apache.tajo.master.event.TaskAttemptStatusUpdateEvent;
-import org.apache.tajo.master.event.TaskCompletionEvent;
-import org.apache.tajo.master.event.TaskFatalErrorEvent;
-import org.apache.tajo.master.event.TaskRequestEvent;
-import org.apache.tajo.rpc.ProtoAsyncRpcServer;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
-public class TaskRunnerListener extends AbstractService
- implements MasterWorkerProtocolService.Interface {
-
- private final static Log LOG = LogFactory.getLog(
- org.apache.tajo.master.cluster.WorkerListener.class);
- private QueryContext context;
- private ProtoAsyncRpcServer rpcServer;
- private InetSocketAddress bindAddr;
- private String addr;
-
- public TaskRunnerListener(final QueryContext context) throws Exception {
- super(org.apache.tajo.master.cluster.WorkerListener.class.getName());
- this.context = context;
-
-
- InetSocketAddress initIsa =
- new InetSocketAddress(InetAddress.getLocalHost(), 0);
- if (initIsa.getAddress() == null) {
- throw new IllegalArgumentException("Failed resolve of " + initIsa);
- }
- try {
- this.rpcServer = new ProtoAsyncRpcServer(MasterWorkerProtocol.class,
- this, initIsa);
- } catch (Exception e) {
- LOG.error(e);
- }
- this.rpcServer.start();
- this.bindAddr = rpcServer.getBindAddress();
- this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
- }
-
- @Override
- public void init(Configuration conf) {
- // Setup RPC server
- try {
- InetSocketAddress initIsa =
- new InetSocketAddress(InetAddress.getLocalHost(), 0);
- if (initIsa.getAddress() == null) {
- throw new IllegalArgumentException("Failed resolve of " + initIsa);
- }
-
- this.rpcServer = new ProtoAsyncRpcServer(MasterWorkerProtocol.class,
- this, initIsa);
-
- this.rpcServer.start();
- this.bindAddr = rpcServer.getBindAddress();
- this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
-
- } catch (Exception e) {
- LOG.error(e);
- }
-
- // Get the master address
- LOG.info(org.apache.tajo.master.cluster.WorkerListener.class.getSimpleName() + " is bind to " + addr);
- context.getConf().setVar(TajoConf.ConfVars.TASKRUNNER_LISTENER_ADDRESS, addr);
-
- super.init(conf);
- }
-
- @Override
- public void start() {
-
-
- super.start();
- }
-
- @Override
- public void stop() {
- rpcServer.shutdown();
- super.stop();
- }
-
- public InetSocketAddress getBindAddress() {
- return this.bindAddr;
- }
-
- public String getAddress() {
- return this.addr;
- }
-
- static BoolProto TRUE_PROTO = BoolProto.newBuilder().setValue(true).build();
-
- @Override
- public void getTask(RpcController controller, ContainerIdProto request,
- RpcCallback<QueryUnitRequestProto> done) {
- context.getEventHandler().handle(new TaskRequestEvent(
- new ContainerIdPBImpl(request), done));
- }
-
- @Override
- public void statusUpdate(RpcController controller, TaskStatusProto request,
- RpcCallback<BoolProto> done) {
- QueryUnitAttemptId attemptId = new QueryUnitAttemptId(request.getId());
- context.getEventHandler().handle(new TaskAttemptStatusUpdateEvent(attemptId,
- request));
- done.run(TRUE_PROTO);
- }
-
- @Override
- public void ping(RpcController controller,
- QueryUnitAttemptIdProto attemptIdProto,
- RpcCallback<BoolProto> done) {
- // TODO - to be completed
- QueryUnitAttemptId attemptId = new QueryUnitAttemptId(attemptIdProto);
-// context.getQuery(attemptId.getQueryId()).getSubQuery(attemptId.getSubQueryId()).
-// getQueryUnit(attemptId.getQueryUnitId()).getAttempt(attemptId).
-// resetExpireTime();
- done.run(TRUE_PROTO);
- }
-
- @Override
- public void fatalError(RpcController controller, TaskFatalErrorReport report,
- RpcCallback<BoolProto> done) {
- context.getEventHandler().handle(new TaskFatalErrorEvent(report));
- done.run(TRUE_PROTO);
- }
-
- @Override
- public void done(RpcController controller, TaskCompletionReport report,
- RpcCallback<BoolProto> done) {
- context.getEventHandler().handle(new TaskCompletionEvent(report));
- done.run(TRUE_PROTO);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
index 381c333..62e702d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
@@ -29,18 +29,18 @@ import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.SubQueryId;
-import org.apache.tajo.engine.MasterWorkerProtos;
import org.apache.tajo.engine.planner.logical.ScanNode;
import org.apache.tajo.engine.query.QueryUnitRequestImpl;
+import org.apache.tajo.ipc.QueryMasterProtocol;
import org.apache.tajo.ipc.protocolrecords.QueryUnitRequest;
-import org.apache.tajo.master.QueryMaster.QueryContext;
-import org.apache.tajo.master.TaskRunnerLauncherImpl.ContainerProxy;
+import org.apache.tajo.master.querymaster.QueryMaster.QueryContext;
import org.apache.tajo.master.event.TaskAttemptAssignedEvent;
import org.apache.tajo.master.event.TaskRequestEvent;
import org.apache.tajo.master.event.TaskRequestEvent.TaskRequestEventType;
import org.apache.tajo.master.event.TaskScheduleEvent;
import org.apache.tajo.master.event.TaskSchedulerEvent;
import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
+import org.apache.tajo.master.querymaster.QueryUnit;
import org.apache.tajo.storage.Fragment;
import org.apache.tajo.util.TajoIdUtils;
@@ -96,9 +96,11 @@ public class TaskSchedulerImpl extends AbstractService
event = eventQueue.take();
handleEvent(event);
} catch (InterruptedException e) {
- LOG.error("Returning, iterrupted : " + e);
+ //LOG.error("Returning, iterrupted : " + e);
+ break;
}
}
+ LOG.info("TaskScheduler eventHandlingThread stopped");
}
};
@@ -111,11 +113,12 @@ public class TaskSchedulerImpl extends AbstractService
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
- LOG.warn(e);
+ break;
}
schedule();
}
+ LOG.info("TaskScheduler schedulingThread stopped");
}
};
@@ -124,14 +127,13 @@ public class TaskSchedulerImpl extends AbstractService
}
private static final QueryUnitAttemptId NULL_ID;
- private static final MasterWorkerProtos.QueryUnitRequestProto stopTaskRunnerReq;
+ private static final QueryMasterProtocol.QueryUnitRequestProto stopTaskRunnerReq;
static {
SubQueryId nullSubQuery =
QueryIdFactory.newSubQueryId(TajoIdUtils.NullQueryId);
NULL_ID = QueryIdFactory.newQueryUnitAttemptId(QueryIdFactory.newQueryUnitId(nullSubQuery, 0), 0);
- MasterWorkerProtos.QueryUnitRequestProto.Builder builder =
- MasterWorkerProtos.QueryUnitRequestProto.newBuilder();
+ QueryMasterProtocol.QueryUnitRequestProto.Builder builder = QueryMasterProtocol.QueryUnitRequestProto.newBuilder();
builder.setId(NULL_ID.getProto());
builder.setShouldDie(true);
builder.setOutputTable("");
@@ -151,6 +153,7 @@ public class TaskSchedulerImpl extends AbstractService
req.getCallback().run(stopTaskRunnerReq);
}
+ LOG.info("Task Scheduler stopped");
super.stop();
}
@@ -301,7 +304,7 @@ public class TaskSchedulerImpl extends AbstractService
while (it.hasNext() && leafTasks.size() > 0) {
taskRequest = it.next();
ContainerProxy container = context.getContainer(taskRequest.getContainerId());
- String hostName = container.getHostName();
+ String hostName = container.getTaskHostName();
QueryUnitAttemptId attemptId = null;
@@ -360,7 +363,7 @@ public class TaskSchedulerImpl extends AbstractService
context.getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
taskRequest.getContainerId(),
- container.getHostName(), container.getPullServerPort()));
+ container.getTaskHostName(), container.getTaskPort()));
AssignedRequest.add(attemptId);
totalAssigned++;
@@ -411,7 +414,7 @@ public class TaskSchedulerImpl extends AbstractService
ContainerProxy container = context.getContainer(
taskRequest.getContainerId());
context.getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
- taskRequest.getContainerId(), container.getHostName(), container.getPullServerPort()));
+ taskRequest.getContainerId(), container.getTaskHostName(), container.getTaskPort()));
taskRequest.getCallback().run(taskAssign.getProto());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/cluster/WorkerListener.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/cluster/WorkerListener.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/cluster/WorkerListener.java
deleted file mode 100644
index b5bb84c..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/cluster/WorkerListener.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.cluster;
-
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
-import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.TajoIdProtos.QueryUnitAttemptIdProto;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.engine.MasterWorkerProtos.QueryUnitRequestProto;
-import org.apache.tajo.engine.MasterWorkerProtos.TaskCompletionReport;
-import org.apache.tajo.engine.MasterWorkerProtos.TaskFatalErrorReport;
-import org.apache.tajo.engine.MasterWorkerProtos.TaskStatusProto;
-import org.apache.tajo.ipc.MasterWorkerProtocol;
-import org.apache.tajo.ipc.MasterWorkerProtocol.MasterWorkerProtocolService;
-import org.apache.tajo.master.TajoMaster.MasterContext;
-import org.apache.tajo.master.event.TaskAttemptStatusUpdateEvent;
-import org.apache.tajo.master.event.TaskCompletionEvent;
-import org.apache.tajo.master.event.TaskFatalErrorEvent;
-import org.apache.tajo.rpc.ProtoAsyncRpcServer;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
-
-import java.net.InetSocketAddress;
-
-public class WorkerListener extends AbstractService
- implements MasterWorkerProtocolService.Interface {
-
- private final static Log LOG = LogFactory.getLog(WorkerListener.class);
- private MasterContext context;
- private ProtoAsyncRpcServer rpcServer;
- private InetSocketAddress bindAddr;
- private String addr;
-
- public WorkerListener(final MasterContext context) throws Exception {
- super(WorkerListener.class.getName());
- this.context = context;
-
- String confMasterAddr = context.getConf().getVar(ConfVars.TASKRUNNER_LISTENER_ADDRESS);
- InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterAddr);
- if (initIsa.getAddress() == null) {
- throw new IllegalArgumentException("Failed resolve of " + initIsa);
- }
- try {
- this.rpcServer = new ProtoAsyncRpcServer(MasterWorkerProtocol.class,
- this, initIsa);
- } catch (Exception e) {
- LOG.error(e);
- }
- this.rpcServer.start();
- this.bindAddr = rpcServer.getBindAddress();
- this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
-
- // Setup RPC server
- // Get the master address
- LOG.info(WorkerListener.class.getSimpleName() + " is bind to " + addr);
- context.getConf().setVar(TajoConf.ConfVars.TASKRUNNER_LISTENER_ADDRESS, addr);
- }
-
- @Override
- public void init(Configuration conf) {
- super.init(conf);
- }
-
- @Override
- public void start() {
- super.start();
- }
-
- @Override
- public void stop() {
- rpcServer.shutdown();
- super.stop();
- }
-
- public InetSocketAddress getBindAddress() {
- return this.bindAddr;
- }
-
- public String getAddress() {
- return this.addr;
- }
-
- static BoolProto TRUE_PROTO = BoolProto.newBuilder().setValue(true).build();
-
- @Override
- public void getTask(RpcController controller, ContainerIdProto request,
- RpcCallback<QueryUnitRequestProto> done) {
- //LOG.info("Get TaskRequest from " + request.getHost());
- //context.getEventHandler().handle(new TaskRequestEvent(new NodeIdPBImpl(request), done));
- }
-
- @Override
- public void statusUpdate(RpcController controller, TaskStatusProto request,
- RpcCallback<BoolProto> done) {
- QueryUnitAttemptId attemptId = new QueryUnitAttemptId(request.getId());
- context.getEventHandler().handle(new TaskAttemptStatusUpdateEvent(attemptId,
- request));
- done.run(TRUE_PROTO);
- }
-
- @Override
- public void ping(RpcController controller,
- QueryUnitAttemptIdProto attemptIdProto,
- RpcCallback<BoolProto> done) {
- QueryUnitAttemptId attemptId = new QueryUnitAttemptId(attemptIdProto);
- context.getQuery(attemptId.getQueryId()).getContext().getSubQuery(attemptId.getSubQueryId()).
- getQueryUnit(attemptId.getQueryUnitId()).getAttempt(attemptId).
- resetExpireTime();
- done.run(TRUE_PROTO);
- }
-
- @Override
- public void fatalError(RpcController controller, TaskFatalErrorReport report,
- RpcCallback<BoolProto> done) {
- context.getEventHandler().handle(new TaskFatalErrorEvent(report));
- done.run(TRUE_PROTO);
- }
-
- @Override
- public void done(RpcController controller, TaskCompletionReport report,
- RpcCallback<BoolProto> done) {
- context.getEventHandler().handle(new TaskCompletionEvent(report));
- done.run(TRUE_PROTO);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
index 4032c67..26c7231 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
@@ -19,7 +19,7 @@
package org.apache.tajo.master.event;
import org.apache.tajo.SubQueryId;
-import org.apache.tajo.master.SubQueryState;
+import org.apache.tajo.master.querymaster.SubQueryState;
public class SubQueryCompletedEvent extends QueryEvent {
private final SubQueryId subQueryId;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
index 3191639..d85d4f2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
@@ -20,7 +20,7 @@ package org.apache.tajo.master.event;
import org.apache.tajo.SubQueryId;
import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.master.SubQueryState;
+import org.apache.tajo.master.querymaster.SubQueryState;
public class SubQuerySucceeEvent extends SubQueryCompletedEvent {
private final TableMeta tableMeta;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
index 6409b43..bc84011 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
@@ -19,7 +19,7 @@
package org.apache.tajo.master.event;
import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.engine.MasterWorkerProtos.TaskStatusProto;
+import org.apache.tajo.ipc.QueryMasterProtocol.TaskStatusProto;
public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent {
private final TaskStatusProto status;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
index b36d69c..e3a4b5f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
@@ -19,7 +19,7 @@
package org.apache.tajo.master.event;
import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.engine.MasterWorkerProtos.TaskCompletionReport;
+import org.apache.tajo.ipc.QueryMasterProtocol.TaskCompletionReport;
public class TaskCompletionEvent extends TaskAttemptEvent {
private TaskCompletionReport report;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
index 3d1c78d..06fb392 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
@@ -19,7 +19,7 @@
package org.apache.tajo.master.event;
import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.engine.MasterWorkerProtos.TaskFatalErrorReport;
+import org.apache.tajo.ipc.QueryMasterProtocol.TaskFatalErrorReport;
public class TaskFatalErrorEvent extends TaskAttemptEvent {
private TaskFatalErrorReport report;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
index 25a8a14..166e103 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
@@ -21,7 +21,7 @@ package org.apache.tajo.master.event;
import com.google.protobuf.RpcCallback;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.engine.MasterWorkerProtos.QueryUnitRequestProto;
+import org.apache.tajo.ipc.QueryMasterProtocol.QueryUnitRequestProto;
import org.apache.tajo.master.event.TaskRequestEvent.TaskRequestEventType;
public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
new file mode 100644
index 0000000..3179abf
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -0,0 +1,413 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+import org.apache.tajo.QueryConf;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.SubQueryId;
+import org.apache.tajo.TajoProtos.QueryState;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableDescImpl;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.master.ExecutionBlock;
+import org.apache.tajo.master.ExecutionBlockCursor;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.querymaster.QueryMaster.QueryContext;
+import org.apache.tajo.storage.StorageManager;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class Query implements EventHandler<QueryEvent> {
+ private static final Log LOG = LogFactory.getLog(Query.class);
+
+
+ // Facilities for Query
+ private final QueryConf conf;
+ private final Clock clock;
+ private String queryStr;
+ private Map<SubQueryId, SubQuery> subqueries;
+ private final EventHandler eventHandler;
+ private final MasterPlan plan;
+ private final StorageManager sm;
+ private QueryContext context;
+ private ExecutionBlockCursor cursor;
+
+ // Query Status
+ private final QueryId id;
+ private long appSubmitTime;
+ private long startTime;
+ private long initializationTime;
+ private long finishTime;
+ private TableDesc resultDesc;
+ private int completedSubQueryCount = 0;
+ private final List<String> diagnostics = new ArrayList<String>();
+
+ // Internal Variables
+ private final Lock readLock;
+ private final Lock writeLock;
+ private int priority = 100;
+
+ // State Machine
+ private final StateMachine<QueryState, QueryEventType, QueryEvent> stateMachine;
+
+ private static final StateMachineFactory
+ <Query,QueryState,QueryEventType,QueryEvent> stateMachineFactory =
+ new StateMachineFactory<Query, QueryState, QueryEventType, QueryEvent>
+ (QueryState.QUERY_NEW)
+
+ .addTransition(QueryState.QUERY_NEW,
+ EnumSet.of(QueryState.QUERY_INIT, QueryState.QUERY_FAILED),
+ QueryEventType.INIT, new InitTransition())
+
+ .addTransition(QueryState.QUERY_INIT, QueryState.QUERY_RUNNING,
+ QueryEventType.START, new StartTransition())
+
+ .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING,
+ QueryEventType.INIT_COMPLETED, new InitCompleteTransition())
+ .addTransition(QueryState.QUERY_RUNNING,
+ EnumSet.of(QueryState.QUERY_RUNNING, QueryState.QUERY_SUCCEEDED,
+ QueryState.QUERY_FAILED),
+ QueryEventType.SUBQUERY_COMPLETED,
+ new SubQueryCompletedTransition())
+ .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_ERROR,
+ QueryEventType.INTERNAL_ERROR, new InternalErrorTransition())
+ .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
+ QueryEventType.INTERNAL_ERROR)
+
+ .installTopology();
+
+ public Query(final QueryContext context, final QueryId id, Clock clock,
+ final long appSubmitTime,
+ final String queryStr,
+ final EventHandler eventHandler,
+ final MasterPlan plan,
+ final StorageManager sm) {
+ this.context = context;
+ this.conf = context.getConf();
+ this.id = id;
+ this.clock = clock;
+ this.appSubmitTime = appSubmitTime;
+ this.queryStr = queryStr;
+ subqueries = Maps.newHashMap();
+ this.eventHandler = eventHandler;
+ this.plan = plan;
+ this.sm = sm;
+ cursor = new ExecutionBlockCursor(plan);
+
+ ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ this.readLock = readWriteLock.readLock();
+ this.writeLock = readWriteLock.writeLock();
+
+ stateMachine = stateMachineFactory.make(this);
+ }
+
+ public boolean isCreateTableStmt() {
+ return context.isCreateTableQuery();
+ }
+
+// protected FileSystem getFileSystem(Configuration conf) throws IOException {
+// return FileSystem.get(conf);
+// }
+
+ public float getProgress() {
+ QueryState state = getStateMachine().getCurrentState();
+ if (state == QueryState.QUERY_SUCCEEDED) {
+ return 1.0f;
+ } else {
+ int idx = 0;
+ float [] subProgresses = new float[subqueries.size()];
+ boolean finished = true;
+ for (SubQuery subquery: subqueries.values()) {
+ if (subquery.getState() != SubQueryState.NEW) {
+ subProgresses[idx] = subquery.getProgress();
+ if (finished && subquery.getState() != SubQueryState.SUCCEEDED) {
+ finished = false;
+ }
+ } else {
+ subProgresses[idx] = 0.0f;
+ }
+ idx++;
+ }
+
+ if (finished) {
+ return 1.0f;
+ }
+
+ float totalProgress = 0;
+ float proportion = 1.0f / (float)subqueries.size();
+
+ for (int i = 0; i < subProgresses.length; i++) {
+ totalProgress += subProgresses[i] * proportion;
+ }
+
+ return totalProgress;
+ }
+ }
+
+ public long getAppSubmitTime() {
+ return this.appSubmitTime;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime() {
+ startTime = clock.getTime();
+ }
+
+ public long getInitializationTime() {
+ return initializationTime;
+ }
+
+ public void setInitializationTime() {
+ initializationTime = clock.getTime();
+ }
+
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ public void setFinishTime() {
+ finishTime = clock.getTime();
+ }
+
+ public List<String> getDiagnostics() {
+ readLock.lock();
+ try {
+ return diagnostics;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ protected void addDiagnostic(String diag) {
+ diagnostics.add(diag);
+ }
+
+ public TableDesc getResultDesc() {
+ return resultDesc;
+ }
+
+ public void setResultDesc(TableDesc desc) {
+ resultDesc = desc;
+ }
+
+ public MasterPlan getPlan() {
+ return plan;
+ }
+
+ public StateMachine<QueryState, QueryEventType, QueryEvent> getStateMachine() {
+ return stateMachine;
+ }
+
+ public void addSubQuery(SubQuery subquery) {
+ subqueries.put(subquery.getId(), subquery);
+ }
+
+ public QueryId getId() {
+ return this.id;
+ }
+
+ public SubQuery getSubQuery(SubQueryId id) {
+ return this.subqueries.get(id);
+ }
+
+ public QueryState getState() {
+ readLock.lock();
+ try {
+ return stateMachine.getCurrentState();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public ExecutionBlockCursor getExecutionBlockCursor() {
+ return cursor;
+ }
+
+ static class InitTransition
+ implements MultipleArcTransition<Query, QueryEvent, QueryState> {
+
+ @Override
+ public QueryState transition(Query query, QueryEvent queryEvent) {
+ query.setStartTime();
+ query.context.setState(QueryState.QUERY_INIT);
+ return QueryState.QUERY_INIT;
+ }
+ }
+
+ public static class StartTransition
+ implements SingleArcTransition<Query, QueryEvent> {
+
+ @Override
+ public void transition(Query query, QueryEvent queryEvent) {
+ SubQuery subQuery = new SubQuery(query.context, query.getExecutionBlockCursor().nextBlock(),
+ query.sm);
+ subQuery.setPriority(query.priority--);
+ query.addSubQuery(subQuery);
+ LOG.info("Schedule unit plan: \n" + subQuery.getBlock().getPlan());
+ subQuery.handle(new SubQueryEvent(subQuery.getId(),
+ SubQueryEventType.SQ_INIT));
+ }
+ }
+
+ public static class SubQueryCompletedTransition implements
+ MultipleArcTransition<Query, QueryEvent, QueryState> {
+
+ @Override
+ public QueryState transition(Query query, QueryEvent event) {
+ // increase the count for completed subqueries
+ query.completedSubQueryCount++;
+ SubQueryCompletedEvent castEvent = (SubQueryCompletedEvent) event;
+ ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
+
+ // if the subquery is succeeded
+ if (castEvent.getFinalState() == SubQueryState.SUCCEEDED) {
+ if (cursor.hasNext()) {
+ SubQuery nextSubQuery = new SubQuery(query.context, cursor.nextBlock(), query.sm);
+ nextSubQuery.setPriority(query.priority--);
+ query.addSubQuery(nextSubQuery);
+ nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(),
+ SubQueryEventType.SQ_INIT));
+ LOG.info("Scheduling SubQuery's Priority: " + nextSubQuery.getPriority());
+ LOG.info("Scheduling SubQuery's Plan: \n" + nextSubQuery.getBlock().getPlan());
+ return query.checkQueryForCompleted();
+
+ } else { // Finish a query
+ if (query.checkQueryForCompleted() == QueryState.QUERY_SUCCEEDED) {
+ SubQuery subQuery = query.getSubQuery(castEvent.getSubQueryId());
+ TableDesc desc = new TableDescImpl(query.conf.getOutputTable(),
+ subQuery.getTableMeta(), query.context.getOutputPath());
+ query.setResultDesc(desc);
+ try {
+ query.writeStat(query.context.getOutputPath(), subQuery);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ query.eventHandler.handle(new QueryFinishEvent(query.getId()));
+
+ if (query.context.isCreateTableQuery()) {
+ // TOOD move to QueryMasterManager
+ //query.context.getCatalog().addTable(desc);
+ }
+ }
+
+ return query.finished(QueryState.QUERY_SUCCEEDED);
+ }
+ } else {
+ // if at least one subquery is failed, the query is also failed.
+ return QueryState.QUERY_FAILED;
+ }
+ }
+ }
+
+ private static class DiagnosticsUpdateTransition implements
+ SingleArcTransition<Query, QueryEvent> {
+ @Override
+ public void transition(Query query, QueryEvent event) {
+ query.addDiagnostic(((QueryDiagnosticsUpdateEvent) event)
+ .getDiagnosticUpdate());
+ }
+ }
+
+ private static class InitCompleteTransition implements
+ SingleArcTransition<Query, QueryEvent> {
+ @Override
+ public void transition(Query query, QueryEvent event) {
+ if (query.initializationTime == 0) {
+ query.setInitializationTime();
+ }
+ }
+ }
+
+ private static class InternalErrorTransition
+ implements SingleArcTransition<Query, QueryEvent> {
+
+ @Override
+ public void transition(Query query, QueryEvent event) {
+ query.finished(QueryState.QUERY_ERROR);
+ }
+ }
+
+ public QueryState finished(QueryState finalState) {
+ setFinishTime();
+ context.setState(finalState);
+ return finalState;
+ }
+
+ /**
+ * Check if all subqueries of the query are completed
+ * @return QueryState.QUERY_SUCCEEDED if all subqueries are completed.
+ */
+ QueryState checkQueryForCompleted() {
+ if (completedSubQueryCount == subqueries.size()) {
+ return QueryState.QUERY_SUCCEEDED;
+ }
+ return getState();
+ }
+
+
+ @Override
+ public void handle(QueryEvent event) {
+ LOG.info("Processing " + event.getQueryId() + " of type " + event.getType());
+ try {
+ writeLock.lock();
+ QueryState oldState = getState();
+ try {
+ getStateMachine().doTransition(event.getType(), event);
+ } catch (InvalidStateTransitonException e) {
+ LOG.error("Can't handle this event at current state", e);
+ eventHandler.handle(new QueryEvent(this.id,
+ QueryEventType.INTERNAL_ERROR));
+ }
+
+ //notify the eventhandler of state change
+ if (oldState != getState()) {
+ LOG.info(id + " Query Transitioned from " + oldState + " to "
+ + getState());
+ }
+ }
+
+ finally {
+ writeLock.unlock();
+ }
+ }
+
+ private void writeStat(Path outputPath, SubQuery subQuery)
+ throws IOException {
+ ExecutionBlock execBlock = subQuery.getBlock();
+ sm.writeTableMeta(outputPath, subQuery.getTableMeta());
+ }
+}