You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/22 10:10:33 UTC
git commit: TAJO-783: Remove yarn-related code from tajo-core.
(hyunsik)
Repository: tajo
Updated Branches:
refs/heads/master 9cd8dac35 -> 882f92c6d
TAJO-783: Remove yarn-related code from tajo-core. (hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/882f92c6
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/882f92c6
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/882f92c6
Branch: refs/heads/master
Commit: 882f92c6d74818eee7f086a7c5f9467b44101abf
Parents: 9cd8dac
Author: Hyunsik Choi <hy...@apache.org>
Authored: Tue Apr 22 17:04:39 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Tue Apr 22 17:10:15 2014 +0900
----------------------------------------------------------------------
CHANGES.txt | 5 +
tajo-core/pom.xml | 26 --
.../tajo/master/DefaultTaskScheduler.java | 4 +-
.../apache/tajo/master/LazyTaskScheduler.java | 8 +-
.../apache/tajo/master/YarnContainerProxy.java | 414 -------------------
.../tajo/master/YarnTaskRunnerLauncherImpl.java | 200 ---------
.../master/querymaster/QueryMasterTask.java | 9 +-
.../master/rm/YarnRMContainerAllocator.java | 237 -----------
.../tajo/master/rm/YarnTajoResourceManager.java | 349 ----------------
.../tajo/worker/YarnResourceAllocator.java | 117 ------
.../org/apache/tajo/TajoTestingCluster.java | 8 +-
tajo-maven-plugins/pom.xml | 1 +
tajo-project/pom.xml | 6 +
13 files changed, 25 insertions(+), 1359 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/882f92c6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a193a1e..de78d31 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,11 @@ Release 0.9.0 - unreleased
TAJO-769: A minor improvements for HCatalogStore (Fengdong Yu via hyunsik)
+
+ SUB TASKS
+
+ TAJO-783: Remove yarn-related code from tajo-core. (hyunsik)
+
Release 0.8.0 - unreleased
NEW FEATURES
http://git-wip-us.apache.org/repos/asf/tajo/blob/882f92c6/tajo-core/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml
index 2f38e92..f90e089 100644
--- a/tajo-core/pom.xml
+++ b/tajo-core/pom.xml
@@ -303,35 +303,9 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-client</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-server-nodemanager</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-server-tests</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
http://git-wip-us.apache.org/repos/asf/tajo/blob/882f92c6/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index 409a1b1..9978670 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -269,12 +269,12 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
}
int qSize = taskRequestQueue.size();
if (qSize != 0 && qSize % 1000 == 0) {
- LOG.info("Size of event-queue in YarnRMContainerAllocator is " + qSize);
+ LOG.info("Size of event-queue in DefaultTaskScheduler is " + qSize);
}
int remCapacity = taskRequestQueue.remainingCapacity();
if (remCapacity < 1000) {
LOG.warn("Very low remaining capacity in the event-queue "
- + "of YarnRMContainerAllocator: " + remCapacity);
+ + "of DefaultTaskScheduler: " + remCapacity);
}
taskRequestQueue.add(event);
http://git-wip-us.apache.org/repos/asf/tajo/blob/882f92c6/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
index 434ea22..dd82f28 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
@@ -186,12 +186,12 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
public void handle(TaskSchedulerEvent event) {
int qSize = eventQueue.size();
if (qSize != 0 && qSize % 1000 == 0) {
- LOG.info("Size of event-queue in YarnRMContainerAllocator is " + qSize);
+ LOG.info("Size of event-queue in DefaultTaskScheduler is " + qSize);
}
int remCapacity = eventQueue.remainingCapacity();
if (remCapacity < 1000) {
LOG.warn("Very low remaining capacity in the event-queue "
- + "of YarnRMContainerAllocator: " + remCapacity);
+ + "of DefaultTaskScheduler: " + remCapacity);
}
if (event.getType() == EventType.T_SCHEDULE) {
@@ -305,12 +305,12 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
}
int qSize = taskRequestQueue.size();
if (qSize != 0 && qSize % 1000 == 0) {
- LOG.info("Size of event-queue in YarnRMContainerAllocator is " + qSize);
+ LOG.info("Size of event-queue in DefaultTaskScheduler is " + qSize);
}
int remCapacity = taskRequestQueue.remainingCapacity();
if (remCapacity < 1000) {
LOG.warn("Very low remaining capacity in the event-queue "
- + "of YarnRMContainerAllocator: " + remCapacity);
+ + "of DefaultTaskScheduler: " + remCapacity);
}
taskRequestQueue.add(event);
http://git-wip-us.apache.org/repos/asf/tajo/blob/882f92c6/tajo-core/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/YarnContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
deleted file mode 100644
index 4f178fb..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
+++ /dev/null
@@ -1,414 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
-import org.apache.tajo.pullserver.PullServerAuxService;
-import org.apache.tajo.worker.TajoWorker;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
-import java.util.*;
-
-public class YarnContainerProxy extends ContainerProxy {
- private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-
- protected final YarnRPC yarnRPC;
- final protected String containerMgrAddress;
- protected Token containerToken;
-
- public YarnContainerProxy(QueryMasterTask.QueryMasterTaskContext context, Configuration conf, YarnRPC yarnRPC,
- Container container, ExecutionBlockId executionBlockId) {
- super(context, conf, executionBlockId, container);
- this.yarnRPC = yarnRPC;
-
- NodeId nodeId = container.getNodeId();
- this.containerMgrAddress = nodeId.getHost() + ":" + nodeId.getPort();
- this.containerToken = container.getContainerToken();
- }
-
- protected ContainerManagementProtocol getCMProxy(ContainerId containerID,
- final String containerManagerBindAddr,
- Token containerToken)
- throws IOException {
- String [] hosts = containerManagerBindAddr.split(":");
- final InetSocketAddress cmAddr =
- new InetSocketAddress(hosts[0], Integer.parseInt(hosts[1]));
- UserGroupInformation user = UserGroupInformation.getCurrentUser();
-
- if (UserGroupInformation.isSecurityEnabled()) {
- org.apache.hadoop.security.token.Token<ContainerTokenIdentifier> token =
- ConverterUtils.convertFromYarn(containerToken, cmAddr);
- // the user in createRemoteUser in this context has to be ContainerID
- user = UserGroupInformation.createRemoteUser(containerID.toString());
- user.addToken(token);
- }
-
- ContainerManagementProtocol proxy = user.doAs(new PrivilegedAction<ContainerManagementProtocol>() {
- @Override
- public ContainerManagementProtocol run() {
- return (ContainerManagementProtocol) yarnRPC.getProxy(ContainerManagementProtocol.class,
- cmAddr, conf);
- }
- });
-
- return proxy;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public synchronized void launch(ContainerLaunchContext commonContainerLaunchContext) {
- LOG.info("Launching Container with Id: " + containerID);
- if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
- state = ContainerState.DONE;
- LOG.error("Container (" + containerID + " was killed before it was launched");
- return;
- }
-
- ContainerManagementProtocol proxy = null;
- try {
-
- proxy = getCMProxy(containerID, containerMgrAddress,
- containerToken);
-
- // Construct the actual Container
- ContainerLaunchContext containerLaunchContext = createContainerLaunchContext(commonContainerLaunchContext);
-
- // Now launch the actual container
- List<StartContainerRequest> startRequestList = new ArrayList<StartContainerRequest>();
- StartContainerRequest startRequest = Records
- .newRecord(StartContainerRequest.class);
- startRequest.setContainerLaunchContext(containerLaunchContext);
- startRequestList.add(startRequest);
- StartContainersRequest startRequests = Records.newRecord(StartContainersRequest.class);
- startRequests.setStartContainerRequests(startRequestList);
- StartContainersResponse response = proxy.startContainers(startRequests);
-
- ByteBuffer portInfo = response.getAllServicesMetaData().get(PullServerAuxService.PULLSERVER_SERVICEID);
-
- if(portInfo != null) {
- port = PullServerAuxService.deserializeMetaData(portInfo);
- }
-
- LOG.info("PullServer port returned by ContainerManager for "
- + containerID + " : " + port);
-
- if(port < 0) {
- this.state = ContainerState.FAILED;
- throw new IllegalStateException("Invalid shuffle port number "
- + port + " returned for " + containerID);
- }
-
- this.state = ContainerState.RUNNING;
- this.hostName = containerMgrAddress.split(":")[0];
- context.getResourceAllocator().addContainer(containerID, this);
- } catch (Throwable t) {
- String message = "Container launch failed for " + containerID + " : "
- + StringUtils.stringifyException(t);
- this.state = ContainerState.FAILED;
- LOG.error(message);
- } finally {
- if (proxy != null) {
- yarnRPC.stopProxy(proxy, conf);
- }
- }
- }
-
-
- public ContainerLaunchContext createContainerLaunchContext(ContainerLaunchContext commonContainerLaunchContext) {
- // Setup environment by cloning from common env.
- Map<String, String> env = commonContainerLaunchContext.getEnvironment();
- Map<String, String> myEnv = new HashMap<String, String>(env.size());
- myEnv.putAll(env);
-
- // Duplicate the ByteBuffers for access by multiple containers.
- Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
- for (Map.Entry<String, ByteBuffer> entry : commonContainerLaunchContext.getServiceData().entrySet()) {
- myServiceData.put(entry.getKey(), entry.getValue().duplicate());
- }
-
- ////////////////////////////////////////////////////////////////////////////
- // Set the local resources
- ////////////////////////////////////////////////////////////////////////////
- // Set the necessary command to execute the application master
- Vector<CharSequence> vargs = new Vector<CharSequence>(30);
-
- // Set java executable command
- //LOG.info("Setting up app master command");
- vargs.add("${JAVA_HOME}" + "/bin/java");
- // Set Xmx based on am memory size
- vargs.add("-Xmx2000m");
- // Set Remote Debugging
- //if (!context.getQuery().getSubQuery(event.getExecutionBlockId()).isLeafQuery()) {
- //vargs.add("-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
- //}
- // Set class name
- //vargs.add(getRunnerClass());
- vargs.add(TajoWorker.class.getCanonicalName());
- vargs.add("tr"); //workerMode
- vargs.add(getId()); // subqueryId
- vargs.add(containerMgrAddress); // nodeId
- vargs.add(containerID.toString()); // containerId
- Vector<CharSequence> taskParams = getTaskParams();
- if(taskParams != null) {
- vargs.addAll(taskParams);
- }
-
- vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
- vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
-
- // Get final commmand
- StringBuilder command = new StringBuilder();
- for (CharSequence str : vargs) {
- command.append(str).append(" ");
- }
-
- LOG.info("Completed setting up TaskRunner command " + command.toString());
- List<String> commands = new ArrayList<String>();
- commands.add(command.toString());
-
- return BuilderUtils.newContainerLaunchContext(commonContainerLaunchContext.getLocalResources(),
- myEnv,
- commands,
- myServiceData,
- null,
- new HashMap<ApplicationAccessType, String>());
- }
-
- public static ContainerLaunchContext createCommonContainerLaunchContext(Configuration config,
- String queryId, boolean isMaster) {
- TajoConf conf = (TajoConf)config;
-
- ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
-
- try {
- ByteBuffer userToken = ByteBuffer.wrap(UserGroupInformation.getCurrentUser().getShortUserName().getBytes());
- ctx.setTokens(userToken);
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- ////////////////////////////////////////////////////////////////////////////
- // Set the env variables to be setup
- ////////////////////////////////////////////////////////////////////////////
- LOG.info("Set the environment for the application master");
-
- Map<String, String> environment = new HashMap<String, String>();
- //String initialClassPath = getInitialClasspath(conf);
- environment.put(ApplicationConstants.Environment.SHELL.name(), "/bin/bash");
- if(System.getenv(ApplicationConstants.Environment.JAVA_HOME.name()) != null) {
- environment.put(ApplicationConstants.Environment.JAVA_HOME.name(), System.getenv(ApplicationConstants.Environment.JAVA_HOME.name()));
- }
-
- // TODO - to be improved with org.apache.tajo.sh shell script
- Properties prop = System.getProperties();
-
- if (prop.getProperty("tajo.test", "FALSE").equalsIgnoreCase("TRUE") ||
- (System.getenv("tajo.test") != null && System.getenv("tajo.test").equalsIgnoreCase("TRUE"))) {
- LOG.info("tajo.test is TRUE");
- environment.put(ApplicationConstants.Environment.CLASSPATH.name(), prop.getProperty("java.class.path", null));
- environment.put("tajo.test", "TRUE");
- } else {
- // Add AppMaster.jar location to classpath
- // At some point we should not be required to add
- // the hadoop specific classpaths to the env.
- // It should be provided out of the box.
- // For now setting all required classpaths including
- // the classpath to "." for the application jar
- StringBuilder classPathEnv = new StringBuilder("./");
- //for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) {
- for (String c : YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH) {
- classPathEnv.append(':');
- classPathEnv.append(c.trim());
- }
-
- classPathEnv.append(":" + System.getenv("TAJO_BASE_CLASSPATH"));
- classPathEnv.append(":./log4j.properties:./*");
- if(System.getenv("HADOOP_HOME") != null) {
- environment.put("HADOOP_HOME", System.getenv("HADOOP_HOME"));
- environment.put(
- ApplicationConstants.Environment.HADOOP_COMMON_HOME.name(),
- System.getenv("HADOOP_HOME"));
- environment.put(
- ApplicationConstants.Environment.HADOOP_HDFS_HOME.name(),
- System.getenv("HADOOP_HOME"));
- environment.put(
- ApplicationConstants.Environment.HADOOP_YARN_HOME.name(),
- System.getenv("HADOOP_HOME"));
- }
-
- if(System.getenv("TAJO_BASE_CLASSPATH") != null) {
- environment.put("TAJO_BASE_CLASSPATH", System.getenv("TAJO_BASE_CLASSPATH"));
- }
- environment.put(ApplicationConstants.Environment.CLASSPATH.name(), classPathEnv.toString());
- }
-
- ctx.setEnvironment(environment);
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("=================================================");
- for(Map.Entry<String, String> entry: environment.entrySet()) {
- LOG.debug(entry.getKey() + "=" + entry.getValue());
- }
- LOG.debug("=================================================");
- }
- ////////////////////////////////////////////////////////////////////////////
- // Set the local resources
- ////////////////////////////////////////////////////////////////////////////
- Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
- LOG.info("defaultFS: " + conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY));
-
- try {
- FileSystem fs = FileSystem.get(conf);
- FileContext fsCtx = FileContext.getFileContext(conf);
- Path systemConfPath = TajoConf.getSystemConfPath(conf);
- if (!fs.exists(systemConfPath)) {
- LOG.error("system_conf.xml (" + systemConfPath.toString() + ") Not Found");
- }
- LocalResource systemConfResource = createApplicationResource(fsCtx, systemConfPath, LocalResourceType.FILE);
- localResources.put(TajoConstants.SYSTEM_CONF_FILENAME, systemConfResource);
- ctx.setLocalResources(localResources);
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
- }
-
- Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
- try {
- serviceData.put(PullServerAuxService.PULLSERVER_SERVICEID, PullServerAuxService.serializeMetaData(0));
- } catch (IOException ioe) {
- LOG.error(ioe);
- }
- ctx.setServiceData(serviceData);
-
- return ctx;
- }
-
- private static LocalResource createApplicationResource(FileContext fs,
- Path p, LocalResourceType type)
- throws IOException {
- LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
- FileStatus rsrcStat = fs.getFileStatus(p);
- rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
- .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
- rsrc.setSize(rsrcStat.getLen());
- rsrc.setTimestamp(rsrcStat.getModificationTime());
- rsrc.setType(type);
- rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
- return rsrc;
- }
-
- private static void writeConf(Configuration conf, Path queryConfFile)
- throws IOException {
- // Write job file to Tajo's fs
- FileSystem fs = queryConfFile.getFileSystem(conf);
- FSDataOutputStream out =
- FileSystem.create(fs, queryConfFile,
- new FsPermission(QUERYCONF_FILE_PERMISSION));
- try {
- conf.writeXml(out);
- } finally {
- out.close();
- }
- }
-
- @Override
- public synchronized void stopContainer() {
-
- if(isCompletelyDone()) {
- return;
- }
- if(this.state == ContainerState.PREP) {
- this.state = ContainerState.KILLED_BEFORE_LAUNCH;
- } else {
- LOG.info("KILLING " + containerID);
-
- ContainerManagementProtocol proxy = null;
- try {
- proxy = getCMProxy(this.containerID, this.containerMgrAddress,
- this.containerToken);
-
- // kill the remote container if already launched
- List<ContainerId> willBeStopedIds = new ArrayList<ContainerId>();
- willBeStopedIds.add(this.containerID);
- StopContainersRequest stopRequests = Records.newRecord(StopContainersRequest.class);
- stopRequests.setContainerIds(willBeStopedIds);
- proxy.stopContainers(stopRequests);
- // If stopContainer returns without an error, assuming the stop made
- // it over to the NodeManager.
-// context.getEventHandler().handle(
-// new AMContainerEvent(containerID, AMContainerEventType.C_NM_STOP_SENT));
- context.getResourceAllocator().removeContainer(containerID);
- } catch (Throwable t) {
-
- // ignore the cleanup failure
- String message = "cleanup failed for container "
- + this.containerID + " : "
- + StringUtils.stringifyException(t);
-// context.getEventHandler().handle(
-// new AMContainerEventStopFailed(containerID, message));
- LOG.warn(message);
- this.state = ContainerState.DONE;
- return;
- } finally {
- if (proxy != null) {
- yarnRPC.stopProxy(proxy, conf);
- }
- }
- this.state = ContainerState.DONE;
- }
- }
-
- protected Vector<CharSequence> getTaskParams() {
- String queryMasterHost = context.getQueryMasterContext().getWorkerContext()
- .getTajoWorkerManagerService().getBindAddr().getHostName();
- int queryMasterPort = context.getQueryMasterContext().getWorkerContext()
- .getTajoWorkerManagerService().getBindAddr().getPort();
-
- Vector<CharSequence> taskParams = new Vector<CharSequence>();
- taskParams.add(queryMasterHost); // queryMaster hostname
- taskParams.add(String.valueOf(queryMasterPort)); // queryMaster port
- taskParams.add(context.getStagingDir().toString());
- return taskParams;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/882f92c6/tajo-core/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java b/tajo-core/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
deleted file mode 100644
index 8b18b5a..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
-
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class YarnTaskRunnerLauncherImpl extends AbstractService implements TaskRunnerLauncher {
-
- /** Class Logger */
- private static final Log LOG = LogFactory.getLog(YarnTaskRunnerLauncherImpl.class);
- //private final YarnRPC yarnRPC;
- private final static RecordFactory recordFactory =
- RecordFactoryProvider.getRecordFactory(null);
- private QueryMasterTask.QueryMasterTaskContext context;
-
- // For ContainerLauncherSpec
- private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
- private static String initialClasspath = null;
- private static final Object classpathLock = new Object();
- private ContainerLaunchContext commonContainerSpec = null;
-
- final public static FsPermission QUERYCONF_FILE_PERMISSION =
- FsPermission.createImmutable((short) 0644); // rw-r--r--
-
- /** for launching TaskRunners in parallel */
- private final ExecutorService executorService;
-
- private YarnRPC yarnRPC;
-
- public YarnTaskRunnerLauncherImpl(QueryMasterTask.QueryMasterTaskContext context, YarnRPC yarnRPC) {
- super(YarnTaskRunnerLauncherImpl.class.getName());
- this.context = context;
- this.yarnRPC = yarnRPC;
- executorService = Executors.newFixedThreadPool(
- context.getConf().getIntVar(TajoConf.ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
- }
-
- public void start() {
- super.start();
- }
-
- public void stop() {
- executorService.shutdownNow();
-
- Map<ContainerId, ContainerProxy> containers = context.getResourceAllocator().getContainers();
- List<ContainerProxy> list = new ArrayList<ContainerProxy>(containers.values());
- for(ContainerProxy eachProxy: list) {
- try {
- eachProxy.stopContainer();
- } catch (Exception e) {
- }
- }
- super.stop();
- }
-
- @Override
- public void handle(TaskRunnerGroupEvent event) {
- if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) {
- launchTaskRunners(event.executionBlockId, event.getContainers());
- } else if (event.getType() == EventType.CONTAINER_REMOTE_CLEANUP) {
- stopTaskRunners(event.getContainers());
- }
- }
-
- private void launchTaskRunners(ExecutionBlockId executionBlockId, Collection<Container> containers) {
- commonContainerSpec = YarnContainerProxy.createCommonContainerLaunchContext(getConfig(),
- executionBlockId.getQueryId().toString(), false);
- for (Container container : containers) {
- final ContainerProxy proxy = new YarnContainerProxy(context, getConfig(),
- yarnRPC, container, executionBlockId);
- executorService.submit(new LaunchRunner(container.getId(), proxy));
- }
- }
-
- protected class LaunchRunner implements Runnable {
- private final ContainerProxy proxy;
- private final ContainerId id;
- public LaunchRunner(ContainerId id, ContainerProxy proxy) {
- this.proxy = proxy;
- this.id = id;
- }
- @Override
- public void run() {
- proxy.launch(commonContainerSpec);
- LOG.info("ContainerProxy started:" + id);
- }
- }
-
- private void stopTaskRunners(Collection<Container> containers) {
- for (Container container : containers) {
- final ContainerProxy proxy = context.getResourceAllocator().getContainer(container.getId());
- executorService.submit(new StopContainerRunner(container.getId(), proxy));
- }
- }
-
- private static class StopContainerRunner implements Runnable {
- private final ContainerProxy proxy;
- private final ContainerId id;
- public StopContainerRunner(ContainerId id, ContainerProxy proxy) {
- this.id = id;
- this.proxy = proxy;
- }
-
- @Override
- public void run() {
- proxy.stopContainer();
- LOG.info("ContainerProxy stopped:" + id);
- }
- }
-
-
- /**
- * Lock this on initialClasspath so that there is only one fork in the AM for
- * getting the initial class-path. TODO: We already construct
- * a parent CLC and use it for all the containers, so this should go away
- * once the mr-generated-classpath stuff is gone.
- */
- private static String getInitialClasspath(Configuration conf) {
- synchronized (classpathLock) {
- if (initialClasspathFlag.get()) {
- return initialClasspath;
- }
- Map<String, String> env = new HashMap<String, String>();
-
- initialClasspath = env.get(Environment.CLASSPATH.name());
- initialClasspathFlag.set(true);
- return initialClasspath;
- }
- }
-
-// public class TaskRunnerContainerProxy extends ContainerProxy {
-// private final ExecutionBlockId executionBlockId;
-//
-// public TaskRunnerContainerProxy(QueryMasterTask.QueryContext context, Configuration conf, YarnRPC yarnRPC,
-// Container container, ExecutionBlockId executionBlockId) {
-// super(context, conf, yarnRPC, container);
-// this.executionBlockId = executionBlockId;
-// }
-//
-// @Override
-// protected void containerStarted() {
-// context.getEventHandler().handle(new QueryEvent(context.getQueryId(), QueryEventType.INIT_COMPLETED));
-// }
-//
-// @Override
-// protected String getId() {
-// return executionBlockId.toString();
-// }
-//
-// @Override
-// protected String getRunnerClass() {
-// return TaskRunner.class.getCanonicalSignature();
-// }
-//
-// @Override
-// protected Vector<CharSequence> getTaskParams() {
-// Vector<CharSequence> taskParams = new Vector<CharSequence>();
-// taskParams.add(queryMasterHost); // queryMaster hostname
-// taskParams.add(String.valueOf(queryMasterPort)); // queryMaster port
-//
-// return taskParams;
-// }
-// }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/882f92c6/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 23b0def..39ea430 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -37,12 +37,16 @@ import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.parser.HiveQLAnalyzer;
import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.LogicalOptimizer;
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.planner.logical.NodeType;
import org.apache.tajo.engine.planner.logical.ScanNode;
import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.UnimplementedException;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.GlobalEngine;
import org.apache.tajo.master.TajoAsyncDispatcher;
@@ -58,7 +62,6 @@ import org.apache.tajo.util.metrics.TajoMetrics;
import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter;
import org.apache.tajo.worker.AbstractResourceAllocator;
import org.apache.tajo.worker.TajoResourceAllocator;
-import org.apache.tajo.worker.YarnResourceAllocator;
import java.io.IOException;
import java.util.HashMap;
@@ -137,7 +140,7 @@ public class QueryMasterTask extends CompositeService {
if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) {
resourceAllocator = new TajoResourceAllocator(queryTaskContext);
} else {
- resourceAllocator = new YarnResourceAllocator(queryTaskContext);
+ throw new UnimplementedException(resourceManagerClassName + " is not supported yet");
}
addService(resourceAllocator);
http://git-wip-us.apache.org/repos/asf/tajo/blob/882f92c6/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
deleted file mode 100644
index b9e132b..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.rm;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.master.event.ContainerAllocationEvent;
-import org.apache.tajo.master.event.ContainerAllocatorEventType;
-import org.apache.tajo.master.event.SubQueryContainerAllocationEvent;
-import org.apache.tajo.master.querymaster.Query;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
-import org.apache.tajo.master.querymaster.SubQuery;
-import org.apache.tajo.master.querymaster.SubQueryState;
-import org.apache.tajo.util.ApplicationIdUtils;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class YarnRMContainerAllocator extends AMRMClientImpl
- implements EventHandler<ContainerAllocationEvent> {
-
- /** Class Logger */
- private static final Log LOG = LogFactory.getLog(YarnRMContainerAllocator.
- class.getName());
-
- private QueryMasterTask.QueryMasterTaskContext context;
- private ApplicationAttemptId appAttemptId;
- private final EventHandler eventHandler;
-
- public YarnRMContainerAllocator(QueryMasterTask.QueryMasterTaskContext context) {
- super();
- this.context = context;
- this.appAttemptId = ApplicationIdUtils.createApplicationAttemptId(context.getQueryId());
- this.eventHandler = context.getDispatcher().getEventHandler();
- }
-
- public void init(Configuration conf) {
- super.init(conf);
- }
-
- private static final int WAIT_INTERVAL_AVAILABLE_NODES = 500; // 0.5 second
- public void start() {
- super.start();
-
- RegisterApplicationMasterResponse response;
- try {
- response = registerApplicationMaster("localhost", 10080, "http://localhost:1234");
-
- // If the number of cluster nodes is ZERO, it waits for available nodes.
- AllocateResponse allocateResponse = allocate(0.0f);
- while(allocateResponse.getNumClusterNodes() < 1) {
- try {
- Thread.sleep(WAIT_INTERVAL_AVAILABLE_NODES);
- LOG.info("Waiting for Available Cluster Nodes");
- allocateResponse = allocate(0);
- } catch (InterruptedException e) {
- LOG.error(e);
- }
- }
- context.getQueryMasterContext().getWorkerContext().setNumClusterNodes(allocateResponse.getNumClusterNodes());
- } catch (IOException e) {
- LOG.error(e);
- } catch (YarnException e) {
- LOG.error(e);
- }
-
- startAllocatorThread();
- }
-
- protected Thread allocatorThread;
- private final AtomicBoolean stopped = new AtomicBoolean(false);
- private int rmPollInterval = 100;//millis
-
- protected void startAllocatorThread() {
- allocatorThread = new Thread(new Runnable() {
- @Override
- public void run() {
- while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
- try {
- try {
- heartbeat();
- } catch (YarnException e) {
- LOG.error("Error communicating with RM: " + e.getMessage() , e);
- return;
- } catch (Exception e) {
- LOG.error("ERROR IN CONTACTING RM. ", e);
- // TODO: for other exceptions
- if(stopped.get()) {
- break;
- }
- }
- Thread.sleep(rmPollInterval);
- } catch (InterruptedException e) {
- if (!stopped.get()) {
- LOG.warn("Allocated thread interrupted. Returning.");
- }
- break;
- }
- }
- LOG.info("Allocated thread stopped");
- }
- });
- allocatorThread.setName("YarnRMContainerAllocator");
- allocatorThread.start();
- }
-
- public void stop() {
- if(stopped.get()) {
- return;
- }
- LOG.info("un-registering ApplicationMaster(QueryMaster):" + appAttemptId);
- stopped.set(true);
-
- try {
- FinalApplicationStatus status = FinalApplicationStatus.UNDEFINED;
- Query query = context.getQuery();
- if (query != null) {
- TajoProtos.QueryState state = query.getState();
- if (state == TajoProtos.QueryState.QUERY_SUCCEEDED) {
- status = FinalApplicationStatus.SUCCEEDED;
- } else if (state == TajoProtos.QueryState.QUERY_FAILED || state == TajoProtos.QueryState.QUERY_ERROR) {
- status = FinalApplicationStatus.FAILED;
- } else if (state == TajoProtos.QueryState.QUERY_ERROR) {
- status = FinalApplicationStatus.FAILED;
- }
- }
- unregisterApplicationMaster(status, "tajo query finished", null);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
-
- allocatorThread.interrupt();
- LOG.info("un-registered ApplicationMAster(QueryMaster) stopped:" + appAttemptId);
-
- super.stop();
- }
-
- private final Map<Priority, ExecutionBlockId> subQueryMap =
- new HashMap<Priority, ExecutionBlockId>();
-
- private AtomicLong prevReportTime = new AtomicLong(0);
- private int reportInterval = 5 * 1000; // second
-
- public void heartbeat() throws Exception {
- AllocateResponse allocateResponse = allocate(context.getProgress());
-
- List<Container> allocatedContainers = allocateResponse.getAllocatedContainers();
-
- long currentTime = System.currentTimeMillis();
- if ((currentTime - prevReportTime.longValue()) >= reportInterval) {
- LOG.debug("Available Cluster Nodes: " + allocateResponse.getNumClusterNodes());
- LOG.debug("Num of Allocated Containers: " + allocatedContainers.size());
- LOG.info("Available Resource: " + allocateResponse.getAvailableResources());
- prevReportTime.set(currentTime);
- }
-
- if (allocatedContainers.size() > 0) {
- LOG.info("================================================================");
- for (Container container : allocateResponse.getAllocatedContainers()) {
- LOG.info("> Container Id: " + container.getId());
- LOG.info("> Node Id: " + container.getNodeId());
- LOG.info("> Resource (Mem): " + container.getResource().getMemory());
- LOG.info("> Priority: " + container.getPriority());
- }
- LOG.info("================================================================");
-
- Map<ExecutionBlockId, List<Container>> allocated = new HashMap<ExecutionBlockId, List<Container>>();
- for (Container container : allocatedContainers) {
- ExecutionBlockId executionBlockId = subQueryMap.get(container.getPriority());
- SubQueryState state = context.getSubQuery(executionBlockId).getState();
- if (!(SubQuery.isRunningState(state))) {
- releaseAssignedContainer(container.getId());
- } else {
- if (allocated.containsKey(executionBlockId)) {
- allocated.get(executionBlockId).add(container);
- } else {
- allocated.put(executionBlockId, Lists.newArrayList(container));
- }
- }
- }
-
- for (Entry<ExecutionBlockId, List<Container>> entry : allocated.entrySet()) {
- eventHandler.handle(new SubQueryContainerAllocationEvent(entry.getKey(), entry.getValue()));
- }
- }
- }
-
- @Override
- public void handle(ContainerAllocationEvent event) {
-
- if (event.getType() == ContainerAllocatorEventType.CONTAINER_REQ) {
- LOG.info(event);
- subQueryMap.put(event.getPriority(), event.getExecutionBlockId());
- addContainerRequest(new ContainerRequest(event.getCapability(), null, null,
- event.getPriority()));
-
- } else if (event.getType() == ContainerAllocatorEventType.CONTAINER_DEALLOCATE) {
- LOG.info(event);
- } else {
- LOG.info(event);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/882f92c6/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
deleted file mode 100644
index 6d5268c..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.rm;
-
-import com.google.protobuf.RpcCallback;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.client.api.YarnClientApplication;
-import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.proto.YarnProtos;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.exception.UnimplementedException;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.master.YarnContainerProxy;
-import org.apache.tajo.master.querymaster.QueryInProgress;
-import org.apache.tajo.util.ApplicationIdUtils;
-import org.apache.tajo.worker.TajoWorker;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
-import java.util.*;
-
-import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource;
-import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerResourceAllocationResponse;
-
-public class YarnTajoResourceManager extends AbstractService implements WorkerResourceManager {
- private static final Log LOG = LogFactory.getLog(YarnTajoResourceManager.class);
-
- private YarnClient yarnClient;
- private ApplicationMasterProtocol rmClient;
- private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
- private Configuration conf;
- private TajoMaster.MasterContext masterContext;
-
- public YarnTajoResourceManager() {
- super(YarnTajoResourceManager.class.getSimpleName());
- }
-
- public YarnTajoResourceManager(TajoMaster.MasterContext masterContext) {
- super(YarnTajoResourceManager.class.getSimpleName());
- this.masterContext = masterContext;
- }
-
- @Override
- public void stop() {
- }
-
- @Override
- public Map<String, Worker> getWorkers() {
- return new HashMap<String, Worker>();
- }
-
- @Override
- public Map<String, Worker> getInactiveWorkers() {
- return new HashMap<String, Worker>();
- }
-
- public Collection<String> getQueryMasters() {
- return new ArrayList<String>();
- }
-
- public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() {
- return TajoMasterProtocol.ClusterResourceSummary.newBuilder()
- .setNumWorkers(0)
- .setTotalCpuCoreSlots(0)
- .setTotalDiskSlots(0)
- .setTotalMemoryMB(0)
- .setTotalAvailableCpuCoreSlots(0)
- .setTotalAvailableDiskSlots(0)
- .setTotalAvailableMemoryMB(0)
- .build();
- }
-
- @Override
- public void releaseWorkerResource(YarnProtos.ContainerIdProto containerId) {
- throw new UnimplementedException("releaseWorkerResource");
- }
-
- @Override
- public WorkerAllocatedResource allocateQueryMaster(QueryInProgress queryInProgress) {
- throw new UnimplementedException("allocateQueryMaster");
- }
-
- @Override
- public void allocateWorkerResources(
- TajoMasterProtocol.WorkerResourceAllocationRequest request,
- RpcCallback<WorkerResourceAllocationResponse> rpcCallBack) {
- throw new UnimplementedException("allocateWorkerResources");
- }
-
- @Override
- public void init(Configuration conf) {
- this.conf = conf;
- connectYarnClient();
-
- final YarnConfiguration yarnConf = new YarnConfiguration(conf);
- final YarnRPC rpc = YarnRPC.create(conf);
- final InetSocketAddress rmAddress = conf.getSocketAddr(
- YarnConfiguration.RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
-
- UserGroupInformation currentUser;
- try {
- currentUser = UserGroupInformation.getCurrentUser();
- } catch (IOException e) {
- throw new YarnRuntimeException(e);
- }
-
- rmClient = currentUser.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
- @Override
- public ApplicationMasterProtocol run() {
- return (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol.class, rmAddress, yarnConf);
- }
- });
- }
-
- @Override
- public String getSeedQueryId() throws IOException {
- try {
- YarnClientApplication app = yarnClient.createApplication();
- return app.getApplicationSubmissionContext().getApplicationId().toString();
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
-
- throw new IOException(e.getMessage(), e);
- }
- }
-
- @Override
- public void stopQueryMaster(QueryId queryId) {
- try {
- FinalApplicationStatus appStatus = FinalApplicationStatus.UNDEFINED;
- QueryInProgress queryInProgress = masterContext.getQueryJobManager().getQueryInProgress(queryId);
- if(queryInProgress == null) {
- return;
- }
- TajoProtos.QueryState state = queryInProgress.getQueryInfo().getQueryState();
- if (state == TajoProtos.QueryState.QUERY_SUCCEEDED) {
- appStatus = FinalApplicationStatus.SUCCEEDED;
- } else if (state == TajoProtos.QueryState.QUERY_FAILED || state == TajoProtos.QueryState.QUERY_ERROR) {
- appStatus = FinalApplicationStatus.FAILED;
- } else if (state == TajoProtos.QueryState.QUERY_ERROR) {
- appStatus = FinalApplicationStatus.FAILED;
- }
- FinishApplicationMasterRequest request = recordFactory
- .newRecordInstance(FinishApplicationMasterRequest.class);
- request.setFinalApplicationStatus(appStatus);
- request.setDiagnostics("QueryMaster shutdown by TajoMaster.");
- rmClient.finishApplicationMaster(request);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
- private void connectYarnClient() {
- this.yarnClient = new YarnClientImpl();
- this.yarnClient.init(conf);
- this.yarnClient.start();
- }
-
- private ApplicationAttemptId allocateAndLaunchQueryMaster(QueryInProgress queryInProgress) throws IOException, YarnException {
- QueryId queryId = queryInProgress.getQueryId();
- ApplicationId appId = ApplicationIdUtils.queryIdToAppId(queryId);
-
- LOG.info("Allocate and launch ApplicationMaster for QueryMaster: queryId=" +
- queryId + ", appId=" + appId);
-
- ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
-
- // set the application id
- appContext.setApplicationId(appId);
- // set the application name
- appContext.setApplicationName("Tajo");
-
- Priority pri = Records.newRecord(Priority.class);
- pri.setPriority(5);
- appContext.setPriority(pri);
-
- // Set the queue to which this application is to be submitted in the RM
- appContext.setQueue("default");
-
- ContainerLaunchContext commonContainerLaunchContext =
- YarnContainerProxy.createCommonContainerLaunchContext(masterContext.getConf(), queryId.toString(), true);
-
- // Setup environment by cloning from common env.
- Map<String, String> env = commonContainerLaunchContext.getEnvironment();
- Map<String, String> myEnv = new HashMap<String, String>(env.size());
- myEnv.putAll(env);
-
- ////////////////////////////////////////////////////////////////////////////
- // Set the local resources
- ////////////////////////////////////////////////////////////////////////////
- // Set the necessary command to execute the application master
- Vector<CharSequence> vargs = new Vector<CharSequence>(30);
-
- // Set java executable command
- //LOG.info("Setting up app master command");
- vargs.add("${JAVA_HOME}" + "/bin/java");
- // Set Xmx based on am memory size
- String jvmOptions = masterContext.getConf().get("tajo.rm.yarn.querymaster.jvm.option", "-Xmx2000m");
-
- for(String eachToken: jvmOptions.split((" "))) {
- vargs.add(eachToken);
- }
- // Set Remote Debugging
- //if (!context.getQuery().getSubQuery(event.getExecutionBlockId()).isLeafQuery()) {
- //vargs.add("-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
- //}
- // Set class name
- vargs.add(TajoWorker.class.getCanonicalName());
- vargs.add("qm");
- vargs.add(queryId.toString()); // queryId
- vargs.add(masterContext.getTajoMasterService().getBindAddress().getHostName() + ":" +
- masterContext.getTajoMasterService().getBindAddress().getPort());
-
- vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
- vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
-
- // Get final commmand
- StringBuilder command = new StringBuilder();
- for (CharSequence str : vargs) {
- command.append(str).append(" ");
- }
-
- LOG.info("Completed setting up QueryMasterRunner command " + command.toString());
- List<String> commands = new ArrayList<String>();
- commands.add(command.toString());
-
- final Resource resource = Records.newRecord(Resource.class);
- // TODO - get default value from conf
- resource.setMemory(2000);
- resource.setVirtualCores(1);
-
- Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
-
- ContainerLaunchContext masterContainerContext = BuilderUtils.newContainerLaunchContext(
- commonContainerLaunchContext.getLocalResources(),
- myEnv,
- commands,
- myServiceData,
- null,
- new HashMap<ApplicationAccessType, String>(2)
- );
-
- appContext.setAMContainerSpec(masterContainerContext);
-
- LOG.info("Submitting QueryMaster to ResourceManager");
- yarnClient.submitApplication(appContext);
-
- ApplicationReport appReport = monitorApplication(appId, EnumSet.of(YarnApplicationState.ACCEPTED));
- ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId();
-
- LOG.info("Launching QueryMaster with appAttemptId: " + attemptId);
-
- return attemptId;
- }
-
- private ApplicationReport monitorApplication(ApplicationId appId,
- Set<YarnApplicationState> finalState) throws IOException, YarnException {
-
- long sleepTime = 100;
- int count = 1;
- while (true) {
- // Get application report for the appId we are interested in
- ApplicationReport report = yarnClient.getApplicationReport(appId);
-
- LOG.info("Got application report from ASM for" + ", appId="
- + appId.getId() + ", appAttemptId="
- + report.getCurrentApplicationAttemptId() + ", clientToken="
- + report.getClientToAMToken() + ", appDiagnostics="
- + report.getDiagnostics() + ", appMasterHost=" + report.getHost()
- + ", appQueue=" + report.getQueue() + ", appMasterRpcPort="
- + report.getRpcPort() + ", appStartTime=" + report.getStartTime()
- + ", yarnAppState=" + report.getYarnApplicationState().toString()
- + ", distributedFinalState="
- + report.getFinalApplicationStatus().toString() + ", appTrackingUrl="
- + report.getTrackingUrl() + ", appUser=" + report.getUser());
-
- YarnApplicationState state = report.getYarnApplicationState();
- if (finalState.contains(state)) {
- return report;
- }
- try {
- Thread.sleep(sleepTime);
- sleepTime = count * 100;
- if(count < 10) {
- count++;
- }
- } catch (InterruptedException e) {
- //LOG.debug("Thread sleep in monitoring loop interrupted");
- }
- }
- }
-
- public boolean isQueryMasterStopped(QueryId queryId) {
- ApplicationId appId = ApplicationIdUtils.queryIdToAppId(queryId);
- try {
- ApplicationReport report = yarnClient.getApplicationReport(appId);
- YarnApplicationState state = report.getYarnApplicationState();
- return EnumSet.of(
- YarnApplicationState.FINISHED,
- YarnApplicationState.KILLED,
- YarnApplicationState.FAILED).contains(state);
- } catch (YarnException e) {
- LOG.error(e.getMessage(), e);
- return false;
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/882f92c6/tajo-core/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
deleted file mode 100644
index 1771255..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.proto.YarnProtos;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.master.TaskRunnerGroupEvent;
-import org.apache.tajo.master.TaskRunnerLauncher;
-import org.apache.tajo.master.YarnTaskRunnerLauncherImpl;
-import org.apache.tajo.master.event.ContainerAllocatorEventType;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
-import org.apache.tajo.master.rm.YarnRMContainerAllocator;
-
-public class YarnResourceAllocator extends AbstractResourceAllocator {
- private YarnRMContainerAllocator rmAllocator;
-
- private TaskRunnerLauncher taskRunnerLauncher;
-
- private YarnRPC yarnRPC;
-
- private YarnClient yarnClient;
-
- private static final Log LOG = LogFactory.getLog(YarnResourceAllocator.class.getName());
-
- private QueryMasterTask.QueryMasterTaskContext queryTaskContext;
-
- private TajoConf systemConf;
-
- public YarnResourceAllocator(QueryMasterTask.QueryMasterTaskContext queryTaskContext) {
- this.queryTaskContext = queryTaskContext;
- }
-
- @Override
- public ContainerId makeContainerId(YarnProtos.ContainerIdProto containerId) {
- return new ContainerIdPBImpl(containerId);
- }
-
- @Override
- public void allocateTaskWorker() {
- }
-
- @Override
- public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext,
- int numTasks,
- int memoryMBPerTask) {
- int numClusterNodes = workerContext.getNumClusterNodes();
-
- TajoConf conf = (TajoConf)workerContext.getQueryMaster().getConfig();
- int workerNum = conf.getIntVar(TajoConf.ConfVars.YARN_RM_WORKER_NUMBER_PER_NODE);
- return numClusterNodes == 0 ? numTasks: Math.min(numTasks, numClusterNodes * workerNum);
- }
-
- @Override
- public void init(Configuration conf) {
- systemConf = (TajoConf)conf;
-
- yarnRPC = YarnRPC.create(systemConf);
-
- connectYarnClient();
-
- taskRunnerLauncher = new YarnTaskRunnerLauncherImpl(queryTaskContext, yarnRPC);
- addService((Service) taskRunnerLauncher);
- queryTaskContext.getDispatcher().register(TaskRunnerGroupEvent.EventType.class, taskRunnerLauncher);
-
- rmAllocator = new YarnRMContainerAllocator(queryTaskContext);
- addService(rmAllocator);
- queryTaskContext.getDispatcher().register(ContainerAllocatorEventType.class, rmAllocator);
- super.init(conf);
- }
-
- @Override
- public void stop() {
- try {
- this.yarnClient.stop();
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- super.stop();
- }
-
- @Override
- public void start() {
- super.start();
- }
-
- private void connectYarnClient() {
- this.yarnClient = new YarnClientImpl();
- this.yarnClient.init(systemConf);
- this.yarnClient.start();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/882f92c6/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index ed5e4bc..010faa8 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -40,7 +40,6 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
-import org.apache.tajo.master.rm.YarnTajoResourceManager;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.worker.TajoWorker;
@@ -93,12 +92,7 @@ public class TajoTestingCluster {
void initPropertiesAndConfigs() {
if (System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname) != null) {
String testResourceManager = System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname);
- Preconditions.checkState(
- testResourceManager.equals(TajoWorkerResourceManager.class.getCanonicalName()) ||
- testResourceManager.equals(YarnTajoResourceManager.class.getCanonicalName()),
- ConfVars.RESOURCE_MANAGER_CLASS.varname + " must be either " + TajoWorkerResourceManager.class.getCanonicalName() + " or " +
- YarnTajoResourceManager.class.getCanonicalName() +"."
- );
+ Preconditions.checkState(testResourceManager.equals(TajoWorkerResourceManager.class.getCanonicalName()));
conf.set(ConfVars.RESOURCE_MANAGER_CLASS.varname, System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname));
}
conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB.varname, 1024);
http://git-wip-us.apache.org/repos/asf/tajo/blob/882f92c6/tajo-maven-plugins/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-maven-plugins/pom.xml b/tajo-maven-plugins/pom.xml
index 7de67ef..5b7c63d 100644
--- a/tajo-maven-plugins/pom.xml
+++ b/tajo-maven-plugins/pom.xml
@@ -26,6 +26,7 @@
<artifactId>tajo-maven-plugins</artifactId>
<packaging>maven-plugin</packaging>
<name>Tajo Maven Plugins</name>
+ <version>0.8.0-SNAPSHOT</version>
<properties>
<maven.dependency.version>3.0</maven.dependency.version>
</properties>
http://git-wip-us.apache.org/repos/asf/tajo/blob/882f92c6/tajo-project/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml
index 8e30759..cf0d538 100644
--- a/tajo-project/pom.xml
+++ b/tajo-project/pom.xml
@@ -554,6 +554,12 @@
</lifecycleMappingMetadata>
</configuration>
</plugin>
+
+ <plugin>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-maven-plugins</artifactId>
+ <version>${tajo.version}</version>
+ </plugin>
</plugins>
</pluginManagement>