You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/08/26 14:29:16 UTC
[7/8] TAJO-127: Implement Tajo Resource Manager. (hyoungjunkim via
hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ContainerProxy.java
index 7ae01d6..044f0ae 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ContainerProxy.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ContainerProxy.java
@@ -21,417 +21,52 @@ package org.apache.tajo.master;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ContainerManager;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.ProtoUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.tajo.QueryConf;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.master.querymaster.QueryMaster;
-import org.apache.tajo.pullserver.PullServerAuxService;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
-import java.util.*;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
public abstract class ContainerProxy {
- private static final Log LOG = LogFactory.getLog(ContainerProxy.class);
+ protected static final Log LOG = LogFactory.getLog(ContainerProxy.class);
final public static FsPermission QUERYCONF_FILE_PERMISSION =
FsPermission.createImmutable((short) 0644); // rw-r--r--
- private final static RecordFactory recordFactory =
- RecordFactoryProvider.getRecordFactory(null);
- private static enum ContainerState {
+ protected static enum ContainerState {
PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH
}
- private final YarnRPC yarnRPC;
- private Configuration conf;
- private QueryMaster.QueryContext context;
+ protected final ExecutionBlockId executionBlockId;
+ protected Configuration conf;
+ protected QueryMasterTask.QueryContext context;
- private ContainerState state;
+ protected ContainerState state;
// store enough information to be able to cleanup the container
- private Container container;
- private ContainerId containerID;
- final private String containerMgrAddress;
- private ContainerToken containerToken;
- private String hostName;
- private int port = -1;
+ protected Container container;
+ protected ContainerId containerID;
+ protected String hostName;
+ protected int port = -1;
- protected abstract void containerStarted();
- protected abstract String getId();
- protected abstract String getRunnerClass();
- protected abstract Vector<CharSequence> getTaskParams();
+ public abstract void launch(ContainerLaunchContext containerLaunchContext);
+ public abstract void stopContainer();
- public ContainerProxy(QueryMaster.QueryContext context, Configuration conf, YarnRPC yarnRPC, Container container) {
+ public ContainerProxy(QueryMasterTask.QueryContext context, Configuration conf,
+ ExecutionBlockId executionBlockId, Container container) {
this.context = context;
this.conf = conf;
- this.yarnRPC = yarnRPC;
this.state = ContainerState.PREP;
this.container = container;
+ this.executionBlockId = executionBlockId;
this.containerID = container.getId();
- NodeId nodeId = container.getNodeId();
- this.containerMgrAddress = nodeId.getHost() + ":" + nodeId.getPort();
- this.containerToken = container.getContainerToken();
- }
-
- protected ContainerManager getCMProxy(ContainerId containerID,
- final String containerManagerBindAddr,
- ContainerToken containerToken)
- throws IOException {
- String [] hosts = containerManagerBindAddr.split(":");
- final InetSocketAddress cmAddr =
- new InetSocketAddress(hosts[0], Integer.parseInt(hosts[1]));
- UserGroupInformation user = UserGroupInformation.getCurrentUser();
-
- if (UserGroupInformation.isSecurityEnabled()) {
- Token<ContainerTokenIdentifier> token =
- ProtoUtils.convertFromProtoFormat(containerToken, cmAddr);
- // the user in createRemoteUser in this context has to be ContainerID
- user = UserGroupInformation.createRemoteUser(containerID.toString());
- user.addToken(token);
- }
-
- ContainerManager proxy = user.doAs(new PrivilegedAction<ContainerManager>() {
- @Override
- public ContainerManager run() {
- return (ContainerManager) yarnRPC.getProxy(ContainerManager.class, cmAddr, conf);
- }
- });
-
- return proxy;
}
public synchronized boolean isCompletelyDone() {
return state == ContainerState.DONE || state == ContainerState.FAILED;
}
- @SuppressWarnings("unchecked")
- public synchronized void launch(ContainerLaunchContext commonContainerLaunchContext) {
- LOG.info("Launching Container with Id: " + containerID);
- if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
- state = ContainerState.DONE;
- LOG.error("Container (" + containerID + " was killed before it was launched");
- return;
- }
-
- ContainerManager proxy = null;
- try {
-
- proxy = getCMProxy(containerID, containerMgrAddress,
- containerToken);
-
- // Construct the actual Container
- ContainerLaunchContext containerLaunchContext = createContainerLaunchContext(commonContainerLaunchContext);
-
- // Now launch the actual container
- StartContainerRequest startRequest = Records
- .newRecord(StartContainerRequest.class);
- startRequest.setContainerLaunchContext(containerLaunchContext);
- StartContainerResponse response = proxy.startContainer(startRequest);
-
- ByteBuffer portInfo = response
- .getServiceResponse(PullServerAuxService.PULLSERVER_SERVICEID);
-
- if(portInfo != null) {
- port = PullServerAuxService.deserializeMetaData(portInfo);
- }
-
- LOG.info("PullServer port returned by ContainerManager for "
- + containerID + " : " + port);
-
- if(port < 0) {
- this.state = ContainerState.FAILED;
- throw new IllegalStateException("Invalid shuffle port number "
- + port + " returned for " + containerID);
- }
-
- containerStarted();
-
- this.state = ContainerState.RUNNING;
- this.hostName = containerMgrAddress.split(":")[0];
- context.addContainer(containerID, this);
- } catch (Throwable t) {
- String message = "Container launch failed for " + containerID + " : "
- + StringUtils.stringifyException(t);
- this.state = ContainerState.FAILED;
- LOG.error(message);
- } finally {
- if (proxy != null) {
- yarnRPC.stopProxy(proxy, conf);
- }
- }
- }
-
- public synchronized void kill() {
-
- if(isCompletelyDone()) {
- return;
- }
- if(this.state == ContainerState.PREP) {
- this.state = ContainerState.KILLED_BEFORE_LAUNCH;
- } else {
- LOG.info("KILLING " + containerID);
-
- ContainerManager proxy = null;
- try {
- proxy = getCMProxy(this.containerID, this.containerMgrAddress,
- this.containerToken);
-
- // kill the remote container if already launched
- StopContainerRequest stopRequest = Records
- .newRecord(StopContainerRequest.class);
- stopRequest.setContainerId(this.containerID);
- proxy.stopContainer(stopRequest);
- // If stopContainer returns without an error, assuming the stop made
- // it over to the NodeManager.
-// context.getEventHandler().handle(
-// new AMContainerEvent(containerID, AMContainerEventType.C_NM_STOP_SENT));
- context.removeContainer(containerID);
- } catch (Throwable t) {
-
- // ignore the cleanup failure
- String message = "cleanup failed for container "
- + this.containerID + " : "
- + StringUtils.stringifyException(t);
- LOG.warn(message);
- this.state = ContainerState.DONE;
- return;
- } finally {
- if (proxy != null) {
- yarnRPC.stopProxy(proxy, conf);
- }
- }
- this.state = ContainerState.DONE;
- }
- }
-
- public static ContainerLaunchContext createCommonContainerLaunchContext(Configuration config, String queryId, boolean isMaster) {
- TajoConf conf = (TajoConf)config;
-
- ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
- try {
- ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- ////////////////////////////////////////////////////////////////////////////
- // Set the env variables to be setup
- ////////////////////////////////////////////////////////////////////////////
- LOG.info("Set the environment for the application master");
-
- Map<String, String> environment = new HashMap<String, String>();
- //String initialClassPath = getInitialClasspath(conf);
- environment.put(ApplicationConstants.Environment.SHELL.name(), "/bin/bash");
- if(System.getenv(ApplicationConstants.Environment.JAVA_HOME.name()) != null) {
- environment.put(ApplicationConstants.Environment.JAVA_HOME.name(),
- System.getenv(ApplicationConstants.Environment.JAVA_HOME.name()));
- }
-
- // TODO - to be improved with org.apache.tajo.sh shell script
- Properties prop = System.getProperties();
-
- if (prop.getProperty("tajo.test", "FALSE").equalsIgnoreCase("TRUE") ||
- (System.getenv("tajo.test") != null && System.getenv("tajo.test").equalsIgnoreCase("TRUE"))) {
- LOG.info("tajo.test is TRUE");
- environment.put(ApplicationConstants.Environment.CLASSPATH.name(), prop.getProperty("java.class.path", null));
- environment.put("tajo.test", "TRUE");
- } else {
- // Add AppMaster.jar location to classpath
- // At some point we should not be required to add
- // the hadoop specific classpaths to the env.
- // It should be provided out of the box.
- // For now setting all required classpaths including
- // the classpath to "." for the application jar
- StringBuilder classPathEnv = new StringBuilder("./");
- //for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) {
- for (String c : YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH) {
- classPathEnv.append(':');
- classPathEnv.append(c.trim());
- }
-
- classPathEnv.append(":" + System.getenv("TAJO_BASE_CLASSPATH"));
- classPathEnv.append(":./log4j.properties:./*");
- if(System.getenv("HADOOP_HOME") != null) {
- environment.put("HADOOP_HOME", System.getenv("HADOOP_HOME"));
- environment.put(ApplicationConstants.Environment.HADOOP_COMMON_HOME.name(), System.getenv("HADOOP_HOME"));
- environment.put(ApplicationConstants.Environment.HADOOP_HDFS_HOME.name(), System.getenv("HADOOP_HOME"));
- environment.put(ApplicationConstants.Environment.HADOOP_YARN_HOME.name(), System.getenv("HADOOP_HOME"));
- }
-
- if(System.getenv("TAJO_BASE_CLASSPATH") != null) {
- environment.put("TAJO_BASE_CLASSPATH", System.getenv("TAJO_BASE_CLASSPATH"));
- }
- environment.put(ApplicationConstants.Environment.CLASSPATH.name(), classPathEnv.toString());
- }
-
- ctx.setEnvironment(environment);
-
- ////////////////////////////////////////////////////////////////////////////
- // Set the local resources
- ////////////////////////////////////////////////////////////////////////////
- Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
- FileSystem fs = null;
-
- LOG.info("defaultFS: " + conf.get("fs.defaultFS"));
-
- try {
- fs = FileSystem.get(conf);
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
- }
-
- FileContext fsCtx = null;
- try {
- fsCtx = FileContext.getFileContext(conf);
- } catch (UnsupportedFileSystemException e) {
- LOG.error(e.getMessage(), e);
- }
-
-
- LOG.info("Writing a QueryConf to HDFS and add to local environment");
- try {
- // TODO move to tajo temp
- Path warehousePath = new Path(conf.getVar(TajoConf.ConfVars.ROOT_DIR), TajoConstants.WAREHOUSE_DIR);
- Path queryConfPath = new Path(warehousePath, queryId);
- if(isMaster) {
- queryConfPath = new Path(queryConfPath, QueryConf.QUERY_MASTER_FILENAME);
- } else {
- queryConfPath = new Path(queryConfPath, QueryConf.FILENAME);
- }
-
- if(!fs.exists(queryConfPath)){
- writeConf(conf, queryConfPath);
- } else {
- LOG.warn("QueryConf already exist. path: " + queryConfPath.toString());
- }
- LocalResource queryConfSrc = createApplicationResource(fsCtx, queryConfPath, LocalResourceType.FILE);
-
- localResources.put(queryConfPath.getName(), queryConfSrc);
- ctx.setLocalResources(localResources);
-
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
- }
-
- // TODO - move to sub-class
- // Add shuffle token
- Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
- try {
- serviceData.put(PullServerAuxService.PULLSERVER_SERVICEID, PullServerAuxService.serializeMetaData(0));
- } catch (IOException ioe) {
- LOG.error(ioe);
- }
- ctx.setServiceData(serviceData);
-
- return ctx;
- }
-
- private static LocalResource createApplicationResource(FileContext fs,
- Path p, LocalResourceType type)
- throws IOException {
- LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
- FileStatus rsrcStat = fs.getFileStatus(p);
- rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
- .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
- rsrc.setSize(rsrcStat.getLen());
- rsrc.setTimestamp(rsrcStat.getModificationTime());
- rsrc.setType(type);
- rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
- return rsrc;
- }
-
- private static void writeConf(Configuration conf, Path queryConfFile)
- throws IOException {
- // Write job file to Tajo's fs
- FileSystem fs = queryConfFile.getFileSystem(conf);
- FSDataOutputStream out =
- FileSystem.create(fs, queryConfFile,
- new FsPermission(QUERYCONF_FILE_PERMISSION));
- try {
- conf.writeXml(out);
- } finally {
- out.close();
- }
- }
-
- public ContainerLaunchContext createContainerLaunchContext(ContainerLaunchContext commonContainerLaunchContext) {
- // Setup environment by cloning from common env.
- Map<String, String> env = commonContainerLaunchContext.getEnvironment();
- Map<String, String> myEnv = new HashMap<String, String>(env.size());
- myEnv.putAll(env);
-
- // Duplicate the ByteBuffers for access by multiple containers.
- Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
- for (Map.Entry<String, ByteBuffer> entry : commonContainerLaunchContext.getServiceData().entrySet()) {
- myServiceData.put(entry.getKey(), entry.getValue().duplicate());
- }
-
- ////////////////////////////////////////////////////////////////////////////
- // Set the local resources
- ////////////////////////////////////////////////////////////////////////////
- // Set the necessary command to execute the application master
- Vector<CharSequence> vargs = new Vector<CharSequence>(30);
-
- // Set java executable command
- //LOG.info("Setting up app master command");
- vargs.add("${JAVA_HOME}" + "/bin/java");
- // Set Xmx based on am memory size
- vargs.add("-Xmx2000m");
- // Set Remote Debugging
- //if (!context.getQuery().getSubQuery(event.getSubQueryId()).isLeafQuery()) {
- //vargs.add("-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
- //}
- // Set class name
- vargs.add(getRunnerClass());
- vargs.add(getId()); // subqueryId
- vargs.add(containerMgrAddress); // nodeId
- vargs.add(containerID.toString()); // containerId
- Vector<CharSequence> taskParams = getTaskParams();
- if(taskParams != null) {
- vargs.addAll(taskParams);
- }
-
- vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
- vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
-
- // Get final commmand
- StringBuilder command = new StringBuilder();
- for (CharSequence str : vargs) {
- command.append(str).append(" ");
- }
-
- LOG.info("Completed setting up TaskRunner command " + command.toString());
- List<String> commands = new ArrayList<String>();
- commands.add(command.toString());
-
- return BuilderUtils.newContainerLaunchContext(containerID, commonContainerLaunchContext.getUser(),
- container.getResource(), commonContainerLaunchContext.getLocalResources(), myEnv, commands,
- myServiceData, null, new HashMap<ApplicationAccessType, String>());
- }
-
public String getTaskHostName() {
return this.hostName;
}
@@ -439,4 +74,8 @@ public abstract class ContainerProxy {
public int getTaskPort() {
return this.port;
}
+
+ public String getId() {
+ return executionBlockId.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
index 36327ff..a92ef75 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
@@ -15,7 +15,7 @@
package org.apache.tajo.master;
import com.google.common.base.Preconditions;
-import org.apache.tajo.SubQueryId;
+import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.engine.planner.logical.*;
@@ -40,7 +40,7 @@ public class ExecutionBlock {
RANGE
}
- private SubQueryId subQueryId;
+ private ExecutionBlockId executionBlockId;
private LogicalNode plan = null;
private StoreTableNode store = null;
private List<ScanNode> scanlist = new ArrayList<ScanNode>();
@@ -50,12 +50,12 @@ public class ExecutionBlock {
private boolean hasJoinPlan;
private boolean hasUnionPlan;
- public ExecutionBlock(SubQueryId subQueryId) {
- this.subQueryId = subQueryId;
+ public ExecutionBlock(ExecutionBlockId executionBlockId) {
+ this.executionBlockId = executionBlockId;
}
- public SubQueryId getId() {
- return subQueryId;
+ public ExecutionBlockId getId() {
+ return executionBlockId;
}
public String getOutputName() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 8c3617e..ee4b98d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -23,16 +23,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.client.YarnClient;
-import org.apache.hadoop.yarn.client.YarnClientImpl;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.tajo.QueryConf;
import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TajoProtos;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.CatalogUtil;
@@ -48,18 +43,19 @@ import org.apache.tajo.engine.exception.UnknownWorkerException;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.global.GlobalOptimizer;
-import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.logical.CreateTableNode;
+import org.apache.tajo.engine.planner.logical.DropTableNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.LogicalRootNode;
+import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.master.TajoMaster.MasterContext;
-import org.apache.tajo.master.querymaster.QueryMasterManager;
+import org.apache.tajo.master.querymaster.QueryInfo;
+import org.apache.tajo.master.querymaster.QueryJobManager;
import org.apache.tajo.storage.StorageManager;
import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.util.TajoIdUtils;
import java.io.IOException;
import java.sql.SQLException;
-import java.util.EnumSet;
-import java.util.Set;
@SuppressWarnings("unchecked")
public class GlobalEngine extends AbstractService {
@@ -76,9 +72,6 @@ public class GlobalEngine extends AbstractService {
private GlobalPlanner globalPlanner;
private GlobalOptimizer globalOptimizer;
- // Yarn
- protected YarnClient yarnClient;
-
public GlobalEngine(final MasterContext context)
throws IOException {
super(GlobalEngine.class.getName());
@@ -89,7 +82,6 @@ public class GlobalEngine extends AbstractService {
public void start() {
try {
- connectYarnClient();
analyzer = new SQLAnalyzer();
planner = new LogicalPlanner(context.getCatalog());
optimizer = new LogicalOptimizer();
@@ -98,138 +90,60 @@ public class GlobalEngine extends AbstractService {
globalOptimizer = new GlobalOptimizer();
} catch (Throwable t) {
- t.printStackTrace();
+ LOG.error(t.getMessage(), t);
}
super.start();
}
public void stop() {
super.stop();
- if (yarnClient != null) {
- yarnClient.stop();
- }
}
- public QueryId executeQuery(String tql)
+ public ClientProtos.GetQueryStatusResponse executeQuery(String sql)
throws InterruptedException, IOException,
NoSuchQueryIdException, IllegalQueryStatusException,
UnknownWorkerException, EmptyClusterException {
- long querySubmittionTime = context.getClock().getTime();
- LOG.info("SQL: " + tql);
+ LOG.info("SQL: " + sql);
// parse the query
- Expr planningContext = analyzer.parse(tql);
+ Expr planningContext = analyzer.parse(sql);
LogicalRootNode plan = (LogicalRootNode) createLogicalPlan(planningContext);
+ ClientProtos.GetQueryStatusResponse.Builder responseBuilder = ClientProtos.GetQueryStatusResponse.newBuilder();
+
if (PlannerUtil.checkIfDDLPlan(plan)) {
updateQuery(plan.getChild());
- return TajoIdUtils.NullQueryId;
+
+ responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+ responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+ responseBuilder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
} else {
- GetNewApplicationResponse newApp = yarnClient.getNewApplication();
- ApplicationId appId = newApp.getApplicationId();
- QueryId queryId = TajoIdUtils.createQueryId(appId, 0);
-
- LOG.info("Get AppId: " + appId + ", QueryId: " + queryId);
- LOG.info("Setting up application submission context for ASM");
-
- //request QueryMaster container
- QueryConf queryConf = new QueryConf(context.getConf());
- queryConf.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
- // the output table is given by user
- if (plan.getChild().getType() == NodeType.CREATE_TABLE) {
- CreateTableNode createTableNode = (CreateTableNode) plan.getChild();
- queryConf.setOutputTable(createTableNode.getTableName());
+ QueryJobManager queryJobManager = context.getQueryJobManager();
+ QueryInfo queryInfo = null;
+ try {
+ queryInfo = queryJobManager.createNewQueryJob(sql, plan);
+ } catch (Exception e) {
+ responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+ responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR);
+ responseBuilder.setState(TajoProtos.QueryState.QUERY_ERROR);
+ responseBuilder.setErrorMessage(StringUtils.stringifyException(e));
+
+ return responseBuilder.build();
}
- QueryMasterManager queryMasterManager = new QueryMasterManager(context, yarnClient, queryId, tql, plan, appId,
- context.getClock(), querySubmittionTime);
- queryMasterManager.init(queryConf);
- queryMasterManager.start();
- context.addQuery(queryId, queryMasterManager);
-
- return queryId;
- }
- }
-
- private ApplicationAttemptId submitQuery() throws YarnRemoteException {
- GetNewApplicationResponse newApp = getNewApplication();
- ApplicationId appId = newApp.getApplicationId();
- LOG.info("Get AppId: " + appId);
- LOG.info("Setting up application submission context for ASM");
- ApplicationSubmissionContext appContext = Records
- .newRecord(ApplicationSubmissionContext.class);
+ //queryJobManager.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, queryInfo));
- // set the application id
- appContext.setApplicationId(appId);
- // set the application name
- appContext.setApplicationName("Tajo");
-
- org.apache.hadoop.yarn.api.records.Priority
- pri = Records.newRecord(org.apache.hadoop.yarn.api.records.Priority.class);
- pri.setPriority(5);
- appContext.setPriority(pri);
-
- // Set the queue to which this application is to be submitted in the RM
- appContext.setQueue("default");
-
- ContainerLaunchContext amContainer = Records
- .newRecord(ContainerLaunchContext.class);
- appContext.setAMContainerSpec(amContainer);
-
- LOG.info("Submitting application to ASM");
- yarnClient.submitApplication(appContext);
-
- ApplicationReport appReport = monitorApplication(appId,
- EnumSet.of(YarnApplicationState.ACCEPTED));
- ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId();
- LOG.info("Launching application with id: " + attemptId);
-
- return attemptId;
- }
+ responseBuilder.setQueryId(queryInfo.getQueryId().getProto());
+ responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+ responseBuilder.setState(queryInfo.getQueryState());
+ if(queryInfo.getQueryMasterHost() != null) {
+ responseBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
+ }
+ responseBuilder.setQueryMasterPort(queryInfo.getQueryMasterClientPort());
+ }
- private ApplicationAttemptId submitQueryOld() throws YarnRemoteException {
- GetNewApplicationResponse newApp = getNewApplication();
- // Get a new application id
- ApplicationId appId = newApp.getApplicationId();
- LOG.info("Get AppId: " + appId);
- LOG.info("Setting up application submission context for ASM");
- ApplicationSubmissionContext appContext = Records
- .newRecord(ApplicationSubmissionContext.class);
-
- // set the application id
- appContext.setApplicationId(appId);
- // set the application name
- appContext.setApplicationName("Tajo");
-
- // Set the priority for the application master
- org.apache.hadoop.yarn.api.records.Priority
- pri = Records.newRecord(org.apache.hadoop.yarn.api.records.Priority.class);
- pri.setPriority(5);
- appContext.setPriority(pri);
-
- // Set the queue to which this application is to be submitted in the RM
- appContext.setQueue("default");
-
- // Set up the container launch context for the application master
- ContainerLaunchContext amContainer = Records
- .newRecord(ContainerLaunchContext.class);
- appContext.setAMContainerSpec(amContainer);
-
- // unmanaged AM
- appContext.setUnmanagedAM(true);
- LOG.info("Setting unmanaged AM");
-
- // Submit the application to the applications manager
- LOG.info("Submitting application to ASM");
- yarnClient.submitApplication(appContext);
-
- // Monitor the application to wait for launch state
- ApplicationReport appReport = monitorApplication(appId,
- EnumSet.of(YarnApplicationState.ACCEPTED));
- ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId();
- LOG.info("Launching application with id: " + attemptId);
-
- return attemptId;
+ ClientProtos.GetQueryStatusResponse response = responseBuilder.build();
+ return response;
}
public QueryId updateQuery(String sql) throws IOException, SQLException {
@@ -242,7 +156,7 @@ public class GlobalEngine extends AbstractService {
throw new SQLException("This is not update query:\n" + sql);
} else {
updateQuery(plan.getChild());
- return TajoIdUtils.NullQueryId;
+ return QueryIdFactory.NULL_QUERY_ID;
}
}
@@ -272,24 +186,14 @@ public class GlobalEngine extends AbstractService {
} catch (PlanningException e) {
LOG.error(e.getMessage(), e);
}
- LOG.info("LogicalPlan:\n" + plan.getRootBlock().getRoot());
- return optimizedPlan;
- }
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("LogicalPlan:\n" + plan.getRootBlock().getRoot());
+ }
- private MasterPlan createGlobalPlan(QueryId id, LogicalRootNode rootNode)
- throws IOException {
- MasterPlan globalPlan = globalPlanner.build(id, rootNode);
- return globalOptimizer.optimize(globalPlan);
+ return optimizedPlan;
}
-// private void startQuery(final QueryId queryId, final QueryConf queryConf,
-// final QueryMaster query) {
-// context.getAllQueries().put(queryId, query);
-// query.init(queryConf);
-// query.start();
-// }
-
private TableDesc createTable(CreateTableNode createTable) throws IOException {
TableMeta meta;
@@ -368,58 +272,4 @@ public class GlobalEngine extends AbstractService {
LOG.info("Table \"" + tableName + "\" is dropped.");
}
-
- private void connectYarnClient() {
- this.yarnClient = new YarnClientImpl();
- this.yarnClient.init(context.getConf());
- this.yarnClient.start();
- }
-
- public GetNewApplicationResponse getNewApplication()
- throws YarnRemoteException {
- return yarnClient.getNewApplication();
- }
-
- /**
- * Monitor the submitted application for completion. Kill application if time
- * expires.
- *
- * @param appId
- * Application Id of application to be monitored
- * @return true if application completed successfully
- * @throws YarnRemoteException
- */
- private ApplicationReport monitorApplication(ApplicationId appId,
- Set<YarnApplicationState> finalState) throws YarnRemoteException {
-
- while (true) {
-
- // Check app status every 1 second.
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- LOG.debug("Thread sleep in monitoring loop interrupted");
- }
-
- // Get application report for the appId we are interested in
- ApplicationReport report = yarnClient.getApplicationReport(appId);
-
- LOG.info("Got application report from ASM for" + ", appId="
- + appId.getId() + ", appAttemptId="
- + report.getCurrentApplicationAttemptId() + ", clientToken="
- + report.getClientToken() + ", appDiagnostics="
- + report.getDiagnostics() + ", appMasterHost=" + report.getHost()
- + ", appQueue=" + report.getQueue() + ", appMasterRpcPort="
- + report.getRpcPort() + ", appStartTime=" + report.getStartTime()
- + ", yarnAppState=" + report.getYarnApplicationState().toString()
- + ", distributedFinalState="
- + report.getFinalApplicationStatus().toString() + ", appTrackingUrl="
- + report.getTrackingUrl() + ", appUser=" + report.getUser());
-
- YarnApplicationState state = report.getYarnApplicationState();
- if (finalState.contains(state)) {
- return report;
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
index 8b6ba94..14c8bfd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
@@ -21,9 +21,9 @@ package org.apache.tajo.master;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.SubQueryId;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.conf.TajoConf;
@@ -75,9 +75,9 @@ public class GlobalPlanner {
// insert store at the subnode of the root
UnaryNode root = rootNode;
if (root.getChild().getType() != NodeType.STORE) {
- SubQueryId subQueryId = QueryIdFactory.newSubQueryId(this.queryId);
- outputTableName = subQueryId.toString();
- insertStore(subQueryId.toString(),root).setLocal(false);
+ ExecutionBlockId executionBlockId = QueryIdFactory.newExecutionBlockId(this.queryId);
+ outputTableName = executionBlockId.toString();
+ insertStore(executionBlockId.toString(),root).setLocal(false);
}
// convert 2-phase plan
@@ -113,10 +113,10 @@ public class GlobalPlanner {
if (groupby.getChild().getType() != NodeType.UNION &&
groupby.getChild().getType() != NodeType.STORE &&
groupby.getChild().getType() != NodeType.SCAN) {
- tableId = QueryIdFactory.newSubQueryId(queryId).toString();
+ tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
insertStore(tableId, groupby);
}
- tableId = QueryIdFactory.newSubQueryId(queryId).toString();
+ tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
// insert (a store for the first group by) and (a second group by)
PlannerUtil.transformGroupbyTo2PWithStore((GroupbyNode)node, tableId);
} else if (node.getType() == NodeType.SORT) {
@@ -126,10 +126,10 @@ public class GlobalPlanner {
if (sort.getChild().getType() != NodeType.UNION &&
sort.getChild().getType() != NodeType.STORE &&
sort.getChild().getType() != NodeType.SCAN) {
- tableId = QueryIdFactory.newSubQueryId(queryId).toString();
+ tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
insertStore(tableId, sort);
}
- tableId = QueryIdFactory.newSubQueryId(queryId).toString();
+ tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
// insert (a store for the first sort) and (a second sort)
PlannerUtil.transformSortTo2PWithStore((SortNode)node, tableId);
} else if (node.getType() == NodeType.JOIN) {
@@ -138,8 +138,8 @@ public class GlobalPlanner {
JoinNode join = (JoinNode) node;
/*
- if (join.getOuterNode().getType() == NodeType.SCAN &&
- join.getInnerNode().getType() == NodeType.SCAN) {
+ if (join.getOuterNode().getType() == ExprType.SCAN &&
+ join.getInnerNode().getType() == ExprType.SCAN) {
ScanNode outerScan = (ScanNode) join.getOuterNode();
ScanNode innerScan = (ScanNode) join.getInnerNode();
@@ -198,14 +198,14 @@ public class GlobalPlanner {
// insert stores for the first phase
if (join.getLeftChild().getType() != NodeType.UNION &&
join.getLeftChild().getType() != NodeType.STORE) {
- tableId = QueryIdFactory.newSubQueryId(queryId).toString();
+ tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
store = new StoreTableNode(tableId);
store.setLocal(true);
PlannerUtil.insertOuterNode(node, store);
}
if (join.getRightChild().getType() != NodeType.UNION &&
join.getRightChild().getType() != NodeType.STORE) {
- tableId = QueryIdFactory.newSubQueryId(queryId).toString();
+ tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
store = new StoreTableNode(tableId);
store.setLocal(true);
PlannerUtil.insertInnerNode(node, store);
@@ -216,7 +216,7 @@ public class GlobalPlanner {
// insert stores
if (union.getLeftChild().getType() != NodeType.UNION &&
union.getLeftChild().getType() != NodeType.STORE) {
- tableId = QueryIdFactory.newSubQueryId(queryId).toString();
+ tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
store = new StoreTableNode(tableId);
if(union.getLeftChild().getType() == NodeType.GROUP_BY) {
/*This case is for cube by operator
@@ -230,7 +230,7 @@ public class GlobalPlanner {
}
if (union.getRightChild().getType() != NodeType.UNION &&
union.getRightChild().getType() != NodeType.STORE) {
- tableId = QueryIdFactory.newSubQueryId(queryId).toString();
+ tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
store = new StoreTableNode(tableId);
if(union.getRightChild().getType() == NodeType.GROUP_BY) {
/*This case is for cube by operator
@@ -246,7 +246,7 @@ public class GlobalPlanner {
UnaryNode unary = (UnaryNode)node;
if (unary.getType() != NodeType.STORE &&
unary.getChild().getType() != NodeType.STORE) {
- tableId = QueryIdFactory.newSubQueryId(queryId).toString();
+ tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
insertStore(tableId, unary);
}
}
@@ -283,11 +283,11 @@ public class GlobalPlanner {
if (node.getType() == NodeType.STORE) {
store = (StoreTableNode) node;
- SubQueryId id;
- if (store.getTableName().startsWith(QueryId.PREFIX)) {
- id = TajoIdUtils.newSubQueryId(store.getTableName());
+ ExecutionBlockId id;
+ if (store.getTableName().startsWith(ExecutionBlockId.EB_ID_PREFIX)) {
+ id = TajoIdUtils.createExecutionBlockId(store.getTableName());
} else {
- id = QueryIdFactory.newSubQueryId(queryId);
+ id = QueryIdFactory.newExecutionBlockId(queryId);
}
subQuery = new ExecutionBlock(id);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
new file mode 100644
index 0000000..8f83557
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class TajoAsyncDispatcher extends AbstractService implements Dispatcher {
+
+ private static final Log LOG = LogFactory.getLog(TajoAsyncDispatcher.class);
+
+ private final BlockingQueue<Event> eventQueue;
+ private volatile boolean stopped = false;
+
+ private Thread eventHandlingThread;
+ protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
+ private boolean exitOnDispatchException;
+
+ private String id;
+
+ public TajoAsyncDispatcher(String id) {
+ this(id, new LinkedBlockingQueue<Event>());
+ }
+
+ public TajoAsyncDispatcher(String id, BlockingQueue<Event> eventQueue) {
+ super(TajoAsyncDispatcher.class.getName());
+ this.id = id;
+ this.eventQueue = eventQueue;
+ this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
+ }
+
+ Runnable createThread() {
+ return new Runnable() {
+ @Override
+ public void run() {
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
+ Event event;
+ try {
+ event = eventQueue.take();
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(id + ",event take:" + event.getType() + "," + event);
+ }
+ } catch(InterruptedException ie) {
+ if (!stopped) {
+ LOG.warn("AsyncDispatcher thread interrupted", ie);
+ }
+ return;
+ }
+ dispatch(event);
+ }
+ }
+ };
+ }
+
+ @Override
+ public synchronized void init(Configuration conf) {
+ this.exitOnDispatchException =
+ conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
+ Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ //start all the components
+ super.start();
+ eventHandlingThread = new Thread(createThread());
+ eventHandlingThread.setName("AsyncDispatcher event handler");
+ eventHandlingThread.start();
+
+ LOG.info("AsyncDispatcher started:" + id);
+ }
+
+ @Override
+ public synchronized void stop() {
+ if(stopped) {
+ return;
+ }
+ stopped = true;
+ if (eventHandlingThread != null) {
+ eventHandlingThread.interrupt();
+ try {
+ eventHandlingThread.join();
+ } catch (InterruptedException ie) {
+ LOG.warn("Interrupted Exception while stopping", ie);
+ }
+ }
+
+ // stop all the components
+ super.stop();
+
+ LOG.info("AsyncDispatcher stopped:" + id);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void dispatch(Event event) {
+ //all events go thru this loop
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Dispatching the event " + event.getClass().getName() + "."
+ + event.toString());
+ }
+// LOG.info("====> Dispatching the event " + event.getClass().getName() + "."
+// + event.toString() );
+ Class<? extends Enum> type = event.getType().getDeclaringClass();
+
+ try{
+ EventHandler handler = eventDispatchers.get(type);
+ if(handler != null) {
+ handler.handle(event);
+ } else {
+ throw new Exception("No handler for registered for " + type);
+ }
+ } catch (Throwable t) {
+ //TODO Maybe log the state of the queue
+ LOG.fatal("Error in dispatcher thread:" + event.getType(), t);
+ if (exitOnDispatchException && (ShutdownHookManager.get().isShutdownInProgress()) == false) {
+ LOG.info("Exiting, bye..");
+ System.exit(-1);
+ }
+ } finally {
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void register(Class<? extends Enum> eventType,
+ EventHandler handler) {
+ /* check to see if we have a listener registered */
+ EventHandler<Event> registeredHandler = (EventHandler<Event>)
+ eventDispatchers.get(eventType);
+ LOG.debug("Registering " + eventType + " for " + handler.getClass());
+ if (registeredHandler == null) {
+ eventDispatchers.put(eventType, handler);
+ } else if (!(registeredHandler instanceof MultiListenerHandler)){
+ /* for multiple listeners of an event add the multiple listener handler */
+ MultiListenerHandler multiHandler = new MultiListenerHandler();
+ multiHandler.addHandler(registeredHandler);
+ multiHandler.addHandler(handler);
+ eventDispatchers.put(eventType, multiHandler);
+ } else {
+ /* already a multilistener, just add to it */
+ MultiListenerHandler multiHandler
+ = (MultiListenerHandler) registeredHandler;
+ multiHandler.addHandler(handler);
+ }
+ }
+
+ @Override
+ public EventHandler getEventHandler() {
+ return new GenericEventHandler();
+ }
+
+ class GenericEventHandler implements EventHandler<Event> {
+ public void handle(Event event) {
+ /* all this method does is enqueue all the events onto the queue */
+ int qSize = eventQueue.size();
+ if (qSize !=0 && qSize %1000 == 0) {
+ LOG.info("Size of event-queue is " + qSize);
+ }
+ int remCapacity = eventQueue.remainingCapacity();
+ if (remCapacity < 1000) {
+ LOG.warn("Very low remaining capacity in the event-queue: "
+ + remCapacity);
+ }
+ try {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(id + ",add event:" +
+ event.getType() + "," + event + "," +
+ (eventHandlingThread == null ? "null" : eventHandlingThread.isAlive()));
+ }
+ eventQueue.put(event);
+ } catch (InterruptedException e) {
+ if (!stopped) {
+ LOG.warn("AsyncDispatcher thread interrupted", e);
+ }
+ throw new YarnException(e);
+ }
+ }
+ }
+
+ /**
+ * Multiplexing an event. Sending it to different handlers that
+ * are interested in the event.
+ */
+ static class MultiListenerHandler implements EventHandler<Event> {
+ List<EventHandler<Event>> listofHandlers;
+
+ public MultiListenerHandler() {
+ listofHandlers = new ArrayList<EventHandler<Event>>();
+ }
+
+ @Override
+ public void handle(Event event) {
+ for (EventHandler<Event> handler: listofHandlers) {
+ handler.handle(event);
+ }
+ }
+
+ void addHandler(EventHandler<Event> handler) {
+ listofHandlers.add(handler);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
new file mode 100644
index 0000000..5359311
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.event.QueryEvent;
+import org.apache.tajo.master.event.QueryEventType;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.master.rm.TajoWorkerContainer;
+import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.ProtoAsyncRpcClient;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TajoContainerProxy extends ContainerProxy {
+ public TajoContainerProxy(QueryMasterTask.QueryContext context,
+ Configuration conf, Container container,
+ ExecutionBlockId executionBlockId) {
+ super(context, conf, executionBlockId, container);
+ }
+
+ @Override
+ public void launch(ContainerLaunchContext containerLaunchContext) {
+ context.getResourceAllocator().addContainer(containerID, this);
+ this.hostName = container.getNodeId().getHost();
+ this.port = context.getQueryMasterContext().getWorkerContext().getPullService().getPort();
+ this.state = ContainerState.RUNNING;
+
+ assignExecutionBlock(executionBlockId, container);
+
+ context.getEventHandler().handle(new QueryEvent(context.getQueryId(), QueryEventType.INIT_COMPLETED));
+ }
+
+ private void assignExecutionBlock(ExecutionBlockId executionBlockId, Container container) {
+ ProtoAsyncRpcClient tajoWorkerRpc = null;
+ try {
+ InetSocketAddress myAddr= context.getQueryMasterContext().getWorkerContext()
+ .getTajoWorkerManagerService().getBindAddr();
+
+ InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort());
+ tajoWorkerRpc = new ProtoAsyncRpcClient(TajoWorkerProtocol.class, addr);
+ TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
+
+ TajoWorkerProtocol.RunExecutionBlockRequestProto request =
+ TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder()
+ .setExecutionBlockId(executionBlockId.toString())
+ .setQueryMasterHost(myAddr.getHostName())
+ .setQueryMasterPort(myAddr.getPort())
+ .setNodeId(container.getNodeId().toString())
+ .setContainerId(container.getId().toString())
+ .setQueryOutputPath(context.getOutputPath().toString())
+ .build();
+
+ tajoWorkerRpcClient.executeExecutionBlock(null, request, NullCallback.get());
+ } catch (Exception e) {
+ //TODO retry
+ LOG.error(e.getMessage(), e);
+ } finally {
+ if(tajoWorkerRpc != null) {
+ (new AyncRpcClose(tajoWorkerRpc)).start();
+ }
+ }
+ }
+
+ class AyncRpcClose extends Thread {
+ ProtoAsyncRpcClient client;
+ public AyncRpcClose(ProtoAsyncRpcClient client) {
+ this.client = client;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ client.close();
+ }
+ }
+
+ @Override
+ public synchronized void stopContainer() {
+ LOG.info("Release TajoWorker Resource: " + executionBlockId + "," + containerID + ", state:" + this.state);
+ if(isCompletelyDone()) {
+ LOG.info("====> Container already stopped:" + containerID);
+ return;
+ }
+ if(this.state == ContainerState.PREP) {
+ this.state = ContainerState.KILLED_BEFORE_LAUNCH;
+ } else {
+ try {
+ releaseWorkerResource(context, executionBlockId, ((TajoWorkerContainer)container).getWorkerResource());
+ context.getResourceAllocator().removeContainer(containerID);
+ this.state = ContainerState.DONE;
+ } catch (Throwable t) {
+ // ignore the cleanup failure
+ String message = "cleanup failed for container "
+ + this.containerID + " : "
+ + StringUtils.stringifyException(t);
+ LOG.warn(message);
+ this.state = ContainerState.DONE;
+ return;
+ }
+ }
+ }
+
+ public static void releaseWorkerResource(QueryMasterTask.QueryContext context,
+ ExecutionBlockId executionBlockId,
+ WorkerResource workerResource) throws Exception {
+ List<WorkerResource> workerResources = new ArrayList<WorkerResource>();
+ workerResources.add(workerResource);
+
+ releaseWorkerResource(context, executionBlockId, workerResources);
+ }
+
+ public static void releaseWorkerResource(QueryMasterTask.QueryContext context,
+ ExecutionBlockId executionBlockId,
+ List<WorkerResource> workerResources) throws Exception {
+ List<TajoMasterProtocol.WorkerResourceProto> workerResourceProtos =
+ new ArrayList<TajoMasterProtocol.WorkerResourceProto>();
+
+ for(WorkerResource eahWorkerResource: workerResources) {
+ workerResourceProtos.add(TajoMasterProtocol.WorkerResourceProto.newBuilder()
+ .setWorkerHostAndPort(eahWorkerResource.getId())
+ .setExecutionBlockId(executionBlockId.getProto())
+ .setMemoryMBSlots(eahWorkerResource.getMemoryMBSlots())
+ .setDiskSlots(eahWorkerResource.getDiskSlots())
+ .build()
+ );
+ }
+ context.getQueryMasterContext().getWorkerContext().getTajoMasterRpcClient()
+ .releaseWorkerResource(null,
+ TajoMasterProtocol.WorkerResourceReleaseRequest.newBuilder()
+ .addAllWorkerResources(workerResourceProtos)
+ .build(),
+ NullCallback.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
index b84b51b..f22472a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -18,7 +18,6 @@
package org.apache.tajo.master;
-import com.google.common.collect.Maps;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -32,12 +31,9 @@ import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
@@ -47,16 +43,15 @@ import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.function.Country;
import org.apache.tajo.engine.function.InCountry;
import org.apache.tajo.engine.function.builtin.*;
-import org.apache.tajo.ipc.QueryMasterProtocol;
-import org.apache.tajo.master.querymaster.QueryMasterManager;
-import org.apache.tajo.master.querymaster.QueryMasterManagerService;
+import org.apache.tajo.master.querymaster.QueryJobManager;
+import org.apache.tajo.master.rm.TajoWorkerResourceManager;
+import org.apache.tajo.master.rm.WorkerResourceManager;
import org.apache.tajo.storage.StorageManager;
import org.apache.tajo.webapp.StaticHttpServer;
+import java.lang.reflect.Constructor;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
-import java.util.Map;
public class TajoMaster extends CompositeService {
@@ -79,12 +74,14 @@ public class TajoMaster extends CompositeService {
private GlobalEngine globalEngine;
private AsyncDispatcher dispatcher;
private TajoMasterClientService tajoMasterClientService;
- private QueryMasterManagerService queryMasterManagerService;
- private YarnRPC yarnRPC;
+ private TajoMasterService tajoMasterService;
+ private WorkerResourceManager resourceManager;
//Web Server
private StaticHttpServer webServer;
+ private QueryJobManager queryJobManager;
+
public TajoMaster() throws Exception {
super(TajoMaster.class.getName());
}
@@ -96,16 +93,23 @@ public class TajoMaster extends CompositeService {
context = new MasterContext(conf);
clock = new SystemClock();
-
try {
RackResolver.init(conf);
+// this.conf.writeXml(System.out);
+ String className = this.conf.get("tajo.resource.manager", TajoWorkerResourceManager.class.getCanonicalName());
+ Class<WorkerResourceManager> resourceManagerClass =
+ (Class<WorkerResourceManager>)Class.forName(className);
+
+ Constructor<WorkerResourceManager> constructor = resourceManagerClass.getConstructor(MasterContext.class);
+ resourceManager = constructor.newInstance(context);
+ resourceManager.init(context.getConf());
+
+ //TODO WebServer port configurable
webServer = StaticHttpServer.getInstance(this ,"admin", null, 8080 ,
true, null, context.getConf(), null);
webServer.start();
- QueryIdFactory.reset();
-
// Get the tajo base dir
this.basePath = new Path(conf.getVar(ConfVars.ROOT_DIR));
LOG.info("Tajo Root dir is set " + basePath);
@@ -128,8 +132,6 @@ public class TajoMaster extends CompositeService {
LOG.info("Warehouse dir (" + wareHousePath + ") is created");
}
- yarnRPC = YarnRPC.create(conf);
-
this.dispatcher = new AsyncDispatcher();
addIfService(dispatcher);
@@ -149,15 +151,19 @@ public class TajoMaster extends CompositeService {
globalEngine = new GlobalEngine(context);
addIfService(globalEngine);
+ queryJobManager = new QueryJobManager(context);
+ addIfService(queryJobManager);
+
tajoMasterClientService = new TajoMasterClientService(context);
addIfService(tajoMasterClientService);
- queryMasterManagerService = new QueryMasterManagerService(context);
- addIfService(queryMasterManagerService);
+ tajoMasterService = new TajoMasterService(context);
+ addIfService(tajoMasterService);
} catch (Exception e) {
- e.printStackTrace();
+ LOG.error(e.getMessage(), e);
}
+ LOG.info("====> Tajo master started");
super.init(conf);
}
@@ -282,10 +288,6 @@ public class TajoMaster extends CompositeService {
LOG.error(e);
}
- for(QueryMasterManager eachQuery: getContext().getAllQueries().values()) {
- eachQuery.stop();
- }
-
super.stop();
LOG.info("TajoMaster main thread exiting");
}
@@ -310,40 +312,7 @@ public class TajoMaster extends CompositeService {
return this.storeManager;
}
- // TODO - to be improved
- public Collection<QueryMasterProtocol.TaskStatusProto> getProgressQueries() {
- return null;
- }
-
-// private class QueryEventDispatcher implements EventHandler<QueryEvent> {
-// @Override
-// public void handle(QueryEvent queryEvent) {
-// LOG.info("QueryEvent: " + queryEvent.getQueryId());
-// LOG.info("Found: " + context.getQuery(queryEvent.getQueryId()).getContext().getQueryId());
-// context.getQuery(queryEvent.getQueryId()).handle(queryEvent);
-// }
-// }
-
- public static void main(String[] args) throws Exception {
- StringUtils.startupShutdownMessage(TajoMaster.class, args, LOG);
-
- try {
- TajoMaster master = new TajoMaster();
- ShutdownHookManager.get().addShutdownHook(
- new CompositeServiceShutdownHook(master),
- SHUTDOWN_HOOK_PRIORITY);
- TajoConf conf = new TajoConf(new YarnConfiguration());
- master.init(conf);
- master.start();
- } catch (Throwable t) {
- LOG.fatal("Error starting TajoMaster", t);
- System.exit(-1);
- }
- }
-
public class MasterContext {
- //private final Map<QueryId, QueryMaster> queries = Maps.newConcurrentMap();
- private final Map<QueryId, QueryMasterManager> queries = Maps.newConcurrentMap();
private final TajoConf conf;
public MasterContext(TajoConf conf) {
@@ -358,20 +327,12 @@ public class TajoMaster extends CompositeService {
return clock;
}
- public QueryMasterManager getQuery(QueryId queryId) {
- return queries.get(queryId);
+ public QueryJobManager getQueryJobManager() {
+ return queryJobManager;
}
- public Map<QueryId, QueryMasterManager> getAllQueries() {
- return queries;
- }
-
- public void addQuery(QueryId queryId, QueryMasterManager queryMasterManager) {
- queries.put(queryId, queryMasterManager);
- }
-
- public AsyncDispatcher getDispatcher() {
- return dispatcher;
+ public WorkerResourceManager getResourceManager() {
+ return resourceManager;
}
public EventHandler getEventHandler() {
@@ -390,16 +351,23 @@ public class TajoMaster extends CompositeService {
return storeManager;
}
- public YarnRPC getYarnRPC() {
- return yarnRPC;
+ public TajoMasterService getTajoMasterService() {
+ return tajoMasterService;
}
+ }
- public TajoMasterClientService getClientService() {
- return tajoMasterClientService;
- }
+ public static void main(String[] args) throws Exception {
+ StringUtils.startupShutdownMessage(TajoMaster.class, args, LOG);
- public QueryMasterManagerService getQueryMasterManagerService() {
- return queryMasterManagerService;
+ try {
+ TajoMaster master = new TajoMaster();
+ ShutdownHookManager.get().addShutdownHook(new CompositeServiceShutdownHook(master), SHUTDOWN_HOOK_PRIORITY);
+ TajoConf conf = new TajoConf(new YarnConfiguration());
+ master.init(conf);
+ master.start();
+ } catch (Throwable t) {
+ LOG.fatal("Error starting TajoMaster", t);
+ System.exit(-1);
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index ed1376c..8578b64 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -26,9 +26,11 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.exception.AlreadyExistsTableException;
@@ -37,18 +39,18 @@ import org.apache.tajo.catalog.proto.CatalogProtos.TableDescProto;
import org.apache.tajo.catalog.statistics.TableStat;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.engine.query.exception.SQLSyntaxError;
+import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.ipc.ClientProtos.*;
import org.apache.tajo.ipc.TajoMasterClientProtocol;
import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService;
import org.apache.tajo.master.TajoMaster.MasterContext;
-import org.apache.tajo.master.querymaster.QueryMasterManager;
+import org.apache.tajo.master.querymaster.QueryInProgress;
+import org.apache.tajo.master.querymaster.QueryInfo;
+import org.apache.tajo.master.querymaster.QueryJobManager;
import org.apache.tajo.rpc.ProtoBlockingRpcServer;
import org.apache.tajo.rpc.RemoteException;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
-import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.util.TajoIdUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -88,9 +90,10 @@ public class TajoMasterClientService extends AbstractService {
LOG.error(e);
}
server.start();
- bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
- this.conf.setVar(ConfVars.CLIENT_SERVICE_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddress));
- LOG.info("TajoMasterClientService startup");
+ bindAddress = server.getListenAddress();
+ this.conf.setVar(ConfVars.CLIENT_SERVICE_ADDRESS,
+ org.apache.tajo.util.NetUtils.getIpPortString(bindAddress));
+ LOG.info("Instantiated TajoMasterClientService at " + this.bindAddress);
super.start();
}
@@ -99,7 +102,6 @@ public class TajoMasterClientService extends AbstractService {
if (server != null) {
server.shutdown();
}
- LOG.info("TajoMasterClientService shutdown");
super.stop();
}
@@ -123,40 +125,26 @@ public class TajoMasterClientService extends AbstractService {
}
@Override
- public SubmitQueryResponse submitQuery(RpcController controller,
+ public GetQueryStatusResponse submitQuery(RpcController controller,
QueryRequest request)
throws ServiceException {
- QueryId queryId;
- SubmitQueryResponse.Builder build = SubmitQueryResponse.newBuilder();
try {
- queryId = context.getGlobalEngine().executeQuery(request.getQuery());
- } catch (SQLSyntaxError e) {
- build.setResultCode(ResultCode.ERROR);
- build.setErrorMessage(e.getMessage());
- return build.build();
-
- } catch (Exception e) {
- build.setResultCode(ResultCode.ERROR);
- String msg = e.getMessage();
- if (msg == null) {
- msg = "Internal Error";
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Query [" + request.getQuery() + "] is submitted");
}
-
- if (LOG.isDebugEnabled()) {
- LOG.error(msg, e);
+ return context.getGlobalEngine().executeQuery(request.getQuery());
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ ClientProtos.GetQueryStatusResponse.Builder responseBuilder = ClientProtos.GetQueryStatusResponse.newBuilder();
+ responseBuilder.setResultCode(ResultCode.ERROR);
+ if (e.getMessage() != null) {
+ responseBuilder.setErrorMessage(ExceptionUtils.getStackTrace(e));
} else {
- LOG.error(msg);
+ responseBuilder.setErrorMessage("Internal Error");
}
- build.setErrorMessage(msg);
- return build.build();
+ return responseBuilder.build();
}
-
- LOG.info("Query " + queryId + " is submitted");
- build.setResultCode(ResultCode.OK);
- build.setQueryId(queryId.getProto());
-
- return build.build();
}
@Override
@@ -183,13 +171,17 @@ public class TajoMasterClientService extends AbstractService {
GetQueryResultRequest request)
throws ServiceException {
QueryId queryId = new QueryId(request.getQueryId());
- QueryMasterManager queryMasterManager = context.getQuery(queryId);
+ if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
+ }
+ QueryInProgress queryInProgress = context.getQueryJobManager().getQueryInProgress(queryId);
+ QueryInfo queryInfo = queryInProgress.getQueryInfo();
GetQueryResultResponse.Builder builder
= GetQueryResultResponse.newBuilder();
- switch (queryMasterManager.getState()) {
+ switch (queryInfo.getQueryState()) {
case QUERY_SUCCEEDED:
- builder.setTableDesc((TableDescProto) queryMasterManager.getResultDesc().getProto());
+ // TODO check this logic needed
+ //builder.setTableDesc((TableDescProto) queryJobManager.getResultDesc().getProto());
break;
case QUERY_FAILED:
case QUERY_ERROR:
@@ -218,23 +210,25 @@ public class TajoMasterClientService extends AbstractService {
QueryId queryId = new QueryId(request.getQueryId());
builder.setQueryId(request.getQueryId());
- if (queryId.equals(TajoIdUtils.NullQueryId)) {
+ if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
builder.setResultCode(ResultCode.OK);
builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
} else {
- QueryMasterManager queryMasterManager = context.getQuery(queryId);
- if (queryMasterManager != null) {
+ QueryInProgress queryInProgress = context.getQueryJobManager().getQueryInProgress(queryId);
+ if (queryInProgress != null) {
+ QueryInfo queryInfo = queryInProgress.getQueryInfo();
builder.setResultCode(ResultCode.OK);
- builder.setState(queryMasterManager.getState());
- builder.setProgress(queryMasterManager.getProgress());
- builder.setSubmitTime(queryMasterManager.getAppSubmitTime());
- if(queryMasterManager.getQueryMasterHost() != null) {
- builder.setQueryMasterHost(queryMasterManager.getQueryMasterHost());
- builder.setQueryMasterPort(queryMasterManager.getQueryMasterClientPort());
+ builder.setState(queryInfo.getQueryState());
+ builder.setProgress(queryInfo.getProgress());
+ builder.setSubmitTime(queryInfo.getStartTime());
+ if(queryInfo.getQueryMasterHost() != null) {
+ builder.setQueryMasterHost(queryInfo.getQueryMasterHost());
+ builder.setQueryMasterPort(queryInfo.getQueryMasterClientPort());
}
-
- if (queryMasterManager.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
- builder.setFinishTime(queryMasterManager.getFinishTime());
+ //builder.setInitTime(queryJobManager.getInitializationTime());
+ //builder.setHasResult(!queryJobManager.isCreateTableStmt());
+ if (queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+ builder.setFinishTime(queryInfo.getFinishTime());
} else {
builder.setFinishTime(System.currentTimeMillis());
}
@@ -249,11 +243,12 @@ public class TajoMasterClientService extends AbstractService {
@Override
public BoolProto killQuery(RpcController controller,
- ApplicationAttemptIdProto request)
+ TajoIdProtos.QueryIdProto request)
throws ServiceException {
QueryId queryId = new QueryId(request);
- QueryMasterManager queryMasterManager = context.getQuery(queryId);
- //queryMasterManager.handle(new QueryEvent(queryId, QueryEventType.KILL));
+ QueryJobManager queryJobManager = context.getQueryJobManager();
+ //TODO KHJ, change QueryJobManager to event handler
+ //queryJobManager.handle(new QueryEvent(queryId, QueryEventType.KILL));
return BOOL_TRUE;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
new file mode 100644
index 0000000..f0a4618
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.querymaster.QueryJobManager;
+import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.rpc.ProtoAsyncRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+public class TajoMasterService extends AbstractService {
+ private final static Log LOG = LogFactory.getLog(TajoMasterService.class);
+
+ private final TajoMaster.MasterContext context;
+ private final TajoConf conf;
+ private final TajoMasterServiceHandler masterHandler;
+ private ProtoAsyncRpcServer server;
+ private InetSocketAddress bindAddress;
+
+ private final BoolProto BOOL_TRUE = BoolProto.newBuilder().setValue(true).build();
+ private final BoolProto BOOL_FALSE = BoolProto.newBuilder().setValue(false).build();
+
+ public TajoMasterService(TajoMaster.MasterContext context) {
+ super(TajoMasterService.class.getName());
+ this.context = context;
+ this.conf = context.getConf();
+ this.masterHandler = new TajoMasterServiceHandler();
+ }
+
+ @Override
+ public void start() {
+ // TODO resolve hostname
+ String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_SERVICE_ADDRESS);
+ InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
+ try {
+ server = new ProtoAsyncRpcServer(TajoMasterProtocol.class, masterHandler, initIsa);
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ server.start();
+ bindAddress = server.getListenAddress();
+ this.conf.setVar(TajoConf.ConfVars.TAJO_MASTER_SERVICE_ADDRESS,
+ org.apache.tajo.util.NetUtils.getIpPortString(bindAddress));
+ LOG.info("Instantiated TajoMasterService at " + this.bindAddress);
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ if(server != null) {
+ server.shutdown();
+ server = null;
+ }
+ super.stop();
+ }
+
+ public InetSocketAddress getBindAddress() {
+ return bindAddress;
+ }
+
+ public class TajoMasterServiceHandler
+ implements TajoMasterProtocol.TajoMasterProtocolService.Interface {
+ @Override
+ public void heartbeat(
+ RpcController controller,
+ TajoMasterProtocol.TajoHeartbeat request, RpcCallback<TajoMasterProtocol.TajoHeartbeatResponse> done) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Received QueryHeartbeat:" + request.getTajoWorkerHost() + ":" + request.getTajoWorkerPort());
+ }
+
+ TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand command = null;
+ if(request.hasQueryId()) {
+ QueryId queryId = new QueryId(request.getQueryId());
+
+ //heartbeat from querymaster
+ //LOG.info("Received QueryHeartbeat:" + queryId + "," + request);
+ QueryJobManager queryJobManager = context.getQueryJobManager();
+ command = queryJobManager.queryHeartbeat(request);
+ } else {
+ //heartbeat from TajoWorker
+ context.getResourceManager().workerHeartbeat(request);
+ }
+
+ //ApplicationAttemptId attemptId = queryJobManager.getAppAttemptId();
+ //String attemptIdStr = attemptId == null ? null : attemptId.toString();
+ TajoMasterProtocol.TajoHeartbeatResponse.Builder builder = TajoMasterProtocol.TajoHeartbeatResponse.newBuilder();
+ builder.setHeartbeatResult(BOOL_TRUE);
+ if(command != null) {
+ builder.setResponseCommand(command);
+ }
+ done.run(builder.build());
+ }
+
+ @Override
+ public void allocateWorkerResources(
+ RpcController controller,
+ TajoMasterProtocol.WorkerResourceAllocationRequest request,
+ RpcCallback<TajoMasterProtocol.WorkerResourceAllocationResponse> done) {
+ context.getResourceManager().allocateWorkerResources(request, done);
+
+// List<String> workerHosts = new ArrayList<String>();
+// for(WorkerResource eachWorker: workerResources) {
+// workerHosts.add(eachWorker.getAllocatedHost() + ":" + eachWorker.getPorts()[0]);
+// }
+//
+// done.run(TajoMasterProtocol.WorkerResourceAllocationResponse.newBuilder()
+// .setExecutionBlockId(request.getExecutionBlockId())
+// .addAllAllocatedWorks(workerHosts)
+// .build()
+// );
+ }
+
+ @Override
+ public void releaseWorkerResource(RpcController controller,
+ TajoMasterProtocol.WorkerResourceReleaseRequest request,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ List<TajoMasterProtocol.WorkerResourceProto> workerResources = request.getWorkerResourcesList();
+ for(TajoMasterProtocol.WorkerResourceProto eachWorkerResource: workerResources) {
+ WorkerResource workerResource = new WorkerResource();
+ String[] tokens = eachWorkerResource.getWorkerHostAndPort().split(":");
+ workerResource.setAllocatedHost(tokens[0]);
+ workerResource.setPorts(new int[]{Integer.parseInt(tokens[1])});
+ workerResource.setMemoryMBSlots(eachWorkerResource.getMemoryMBSlots());
+ workerResource.setDiskSlots(eachWorkerResource.getDiskSlots());
+
+ LOG.info("====> releaseWorkerResource:" + workerResource);
+ context.getResourceManager().releaseWorkerResource(
+ new QueryId(eachWorkerResource.getExecutionBlockId().getQueryId()),
+ workerResource);
+ }
+ done.run(BOOL_TRUE);
+ }
+
+ @Override
+ public void stopQueryMaster(RpcController controller, TajoIdProtos.QueryIdProto request,
+ RpcCallback<BoolProto> done) {
+ context.getQueryJobManager().stopQuery(new QueryId(request));
+ done.run(BOOL_TRUE);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java
index 93aaa5d..1e6655c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java
@@ -20,7 +20,7 @@ package org.apache.tajo.master;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.SubQueryId;
+import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
import java.util.Collection;
@@ -31,17 +31,21 @@ public class TaskRunnerGroupEvent extends AbstractEvent<EventType> {
CONTAINER_REMOTE_CLEANUP
}
- protected final SubQueryId subQueryId;
+ protected final ExecutionBlockId executionBlockId;
protected final Collection<Container> containers;
public TaskRunnerGroupEvent(EventType eventType,
- SubQueryId subQueryId,
+ ExecutionBlockId executionBlockId,
Collection<Container> containers) {
super(eventType);
- this.subQueryId = subQueryId;
+ this.executionBlockId = executionBlockId;
this.containers = containers;
}
public Collection<Container> getContainers() {
return containers;
}
+
+ public ExecutionBlockId getExecutionBlockId() {
+ return executionBlockId;
+ }
}