You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2014/01/10 16:27:09 UTC
[2/2] git commit: resolving conflicts
resolving conflicts
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f282a300
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f282a300
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f282a300
Branch: refs/heads/helix-provisioning
Commit: f282a30039223a2068350cac12596b55c55dedca
Parents: 9a1ba91 27f6272
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Fri Jan 10 07:26:58 2014 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Fri Jan 10 07:26:58 2014 -0800
----------------------------------------------------------------------
.../provisioner/ContainerProvider.java | 10 +-
.../controller/provisioner/ContainerState.java | 4 +-
.../stages/ContainerProvisioningStage.java | 150 +++++++++++++------
.../integration/TestLocalContainerProvider.java | 28 +++-
.../provisioning/yarn/ContainerParticipant.java | 15 ++
.../yarn/GenericApplicationMaster.java | 27 ++--
.../provisioning/yarn/YarnProvisioner.java | 89 ++++++-----
.../yarn/YarnProvisionerConfig.java | 46 ++++++
8 files changed, 254 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/f282a300/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
index 0000000,0000000..b0272e8
new file mode 100644
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
@@@ -1,0 -1,0 +1,15 @@@
++package org.apache.helix.provisioning.yarn;
++
++import java.util.Arrays;
++
++import org.apache.commons.logging.Log;
++import org.apache.commons.logging.LogFactory;
++
++public class ContainerParticipant {
++ private static final Log LOG = LogFactory.getLog(ContainerParticipant.class);
++
++ public static void main(String[] args) throws InterruptedException {
++ LOG.info("Starting participant: "+ Arrays.toString(args));
++ Thread.currentThread().join();
++ }
++}
http://git-wip-us.apache.org/repos/asf/helix/blob/f282a300/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
index 1b27378,3b4e937..3adffd6
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
@@@ -98,17 -93,20 +94,15 @@@ public class GenericApplicationMaster
// Tracking url to which app master publishes info for clients to monitor
private String appMasterTrackingUrl = "";
-
- // Counter for completed containers ( complete denotes successful or failed )
- AtomicInteger numCompletedContainers = new AtomicInteger();
- // Allocated container count so that we know how many containers has the RM
- // allocated to us
- AtomicInteger numAllocatedContainers = new AtomicInteger();
- // Count of failed containers
- AtomicInteger numFailedContainers = new AtomicInteger();
- // Count of containers already requested from the RM
- // Needed as once requested, we should not request for containers again.
- // Only request for more if the original requirement changes.
- AtomicInteger numRequestedContainers = new AtomicInteger();
Map<ContainerRequest, SettableFuture<ContainerAskResponse>> containerRequestMap =
new LinkedHashMap<AMRMClient.ContainerRequest, SettableFuture<ContainerAskResponse>>();
+ Map<ContainerId, SettableFuture<ContainerReleaseResponse>> containerReleaseMap =
+ new LinkedHashMap<ContainerId, SettableFuture<ContainerReleaseResponse>>();
+ Map<ContainerId, SettableFuture<ContainerStopResponse>> containerStopMap =
+ new LinkedHashMap<ContainerId, SettableFuture<ContainerStopResponse>>();
+ Map<ContainerId, SettableFuture<ContainerLaunchResponse>> containerLaunchResponseMap =
+ new LinkedHashMap<ContainerId, SettableFuture<ContainerLaunchResponse>>();
-
ByteBuffer allTokens;
// Launch threads
@@@ -120,7 -117,7 +113,7 @@@
// Set up the configuration
conf = new YarnConfiguration();
}
--
++
/**
* Dump out contents of $CWD and the environment to stdout for debugging
*/
@@@ -154,15 -151,15 +147,13 @@@
}
}
--
--
/**
* Parse command line options
* @param args Command line args
* @return Whether init successful and run should be invoked
* @throws ParseException
* @throws IOException
-- * @throws YarnException
++ * @throws YarnException
*/
public boolean start() throws ParseException, IOException, YarnException {
@@@ -234,33 -231,30 +225,35 @@@
return true;
}
--
- public Future<ContainerAskResponse> acquireContainer(ContainerRequest containerAsk) {
+ public ListenableFuture<ContainerAskResponse> acquireContainer(ContainerRequest containerAsk) {
+ amRMClient.addContainerRequest(containerAsk);
- numRequestedContainers.incrementAndGet();
SettableFuture<ContainerAskResponse> future = SettableFuture.create();
+ containerRequestMap.put(containerAsk, future);
+ amRMClient.addContainerRequest(containerAsk);
return future;
}
- public Future<ContainerStopResponse> stopContainer(Container container) {
+ public ListenableFuture<ContainerStopResponse> stopContainer(Container container) {
+ nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
SettableFuture<ContainerStopResponse> future = SettableFuture.create();
+ containerStopMap.put(container.getId(), future);
+ nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
return future;
}
- public Future<ContainerReleaseResponse> releaseContainer(Container container) {
+ public ListenableFuture<ContainerReleaseResponse> releaseContainer(Container container) {
+ amRMClient.releaseAssignedContainer(container.getId());
SettableFuture<ContainerReleaseResponse> future = SettableFuture.create();
+ containerReleaseMap.put(container.getId(), future);
+ amRMClient.releaseAssignedContainer(container.getId());
return future;
}
- public Future<ContainerLaunchResponse> launchContainer(Container container,
+ public ListenableFuture<ContainerLaunchResponse> launchContainer(Container container,
ContainerLaunchContext containerLaunchContext) {
- nmClientAsync.startContainerAsync(container, containerLaunchContext);
SettableFuture<ContainerLaunchResponse> future = SettableFuture.create();
+ containerLaunchResponseMap.put(container.getId(), future);
+ nmClientAsync.startContainerAsync(container, containerLaunchContext);
return future;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/f282a300/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
index f983255,e921c87..f74e312
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
@@@ -1,38 -1,14 +1,39 @@@
package org.apache.helix.provisioning.yarn;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Vector;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+ import java.util.concurrent.Executors;
+import org.apache.commons.compress.archivers.ArchiveStreamFactory;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
@@@ -50,178 -23,70 +51,190 @@@ import org.apache.helix.controller.prov
import org.apache.helix.controller.provisioner.Provisioner;
import org.apache.helix.controller.provisioner.TargetProviderResponse;
+import com.google.common.collect.Lists;
+ import com.google.common.base.Function;
+ import com.google.common.util.concurrent.Futures;
+ import com.google.common.util.concurrent.ListenableFuture;
+ import com.google.common.util.concurrent.ListeningExecutorService;
+ import com.google.common.util.concurrent.MoreExecutors;
public class YarnProvisioner implements Provisioner {
private static final Log LOG = LogFactory.getLog(YarnProvisioner.class);
static GenericApplicationMaster applicationMaster;
- static ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
++ static ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors
++ .newCachedThreadPool());
Map<ContainerId, Container> allocatedContainersMap = new HashMap<ContainerId, Container>();
+ int DEFAULT_CONTAINER = 4;
@Override
- public ContainerId allocateContainer(ContainerSpec spec) {
+ public ListenableFuture<ContainerId> allocateContainer(ContainerSpec spec) {
ContainerRequest containerAsk = setupContainerAskForRM(spec);
- Future<ContainerAskResponse> requestNewContainer =
+ ListenableFuture<ContainerAskResponse> requestNewContainer =
applicationMaster.acquireContainer(containerAsk);
- ContainerAskResponse containerAskResponse;
- try {
- containerAskResponse = requestNewContainer.get();
- ContainerId helixContainerId =
- ContainerId.from(containerAskResponse.getContainer().getId().toString());
- allocatedContainersMap.put(helixContainerId, containerAskResponse.getContainer());
- return helixContainerId;
- } catch (Exception e) {
- LOG.error("Exception in allocateContainer for spec:" + spec, e);
- }
- return null;
- return Futures.transform(requestNewContainer, new Function<ContainerAskResponse, ContainerId>() {
- @Override
- public ContainerId apply(ContainerAskResponse containerAskResponse) {
- ContainerId helixContainerId =
- ContainerId.from(containerAskResponse.getContainer().getId().toString());
- allocatedContainersMap.put(helixContainerId, containerAskResponse.getContainer());
- return helixContainerId;
- }
- });
++ return Futures.transform(requestNewContainer,
++ new Function<ContainerAskResponse, ContainerId>() {
++ @Override
++ public ContainerId apply(ContainerAskResponse containerAskResponse) {
++ ContainerId helixContainerId =
++ ContainerId.from(containerAskResponse.getContainer().getId().toString());
++ allocatedContainersMap.put(helixContainerId, containerAskResponse.getContainer());
++ return helixContainerId;
++ }
++ });
++
}
@Override
- public boolean deallocateContainer(ContainerId containerId) {
- Future<ContainerReleaseResponse> releaseContainer =
+ public ListenableFuture<Boolean> deallocateContainer(ContainerId containerId) {
+ ListenableFuture<ContainerReleaseResponse> releaseContainer =
applicationMaster.releaseContainer(allocatedContainersMap.get(containerId));
- try {
- releaseContainer.get();
- return true;
- } catch (Exception e) {
- LOG.error("Exception in deallocateContainer containerId:" + containerId, e);
- }
- return false;
+ return Futures.transform(releaseContainer, new Function<ContainerReleaseResponse, Boolean>() {
+ @Override
+ public Boolean apply(ContainerReleaseResponse response) {
+ return response != null;
+ }
+ }, service);
++
}
@Override
- public boolean startContainer(ContainerId containerId) {
+ public ListenableFuture<Boolean> startContainer(final ContainerId containerId) {
Container container = allocatedContainersMap.get(containerId);
- ContainerLaunchContext containerLaunchContext = Records.newRecord(ContainerLaunchContext.class);
- ListenableFuture<ContainerLaunchResponse> future = applicationMaster.launchContainer(container, containerLaunchContext);
++ ContainerLaunchContext launchContext;
+ try {
- Future<ContainerLaunchResponse> launchContainer =
- applicationMaster.launchContainer(container, createLaunchContext(containerId));
- ContainerLaunchResponse containerLaunchResponse = launchContainer.get();
- return true;
++ launchContext = createLaunchContext(containerId);
+ } catch (Exception e) {
- LOG.error("Exception while starting container containerId:" + containerId, e);
++ LOG.error("Exception while creating context to launch container:" + containerId, e);
++ return null;
+ }
- return false;
++ ListenableFuture<ContainerLaunchResponse> future =
++ applicationMaster.launchContainer(container, launchContext);
+ return Futures.transform(future, new Function<ContainerLaunchResponse, Boolean>() {
+ @Override
+ public Boolean apply(ContainerLaunchResponse response) {
+ return response != null;
+ }
+ }, service);
}
+ private ContainerLaunchContext createLaunchContext(ContainerId containerId) throws Exception {
+
+ ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+
+ Map<String, String> envs = System.getenv();
+ String appName = envs.get("appName");
+ String appId = envs.get("appId");
+ String appClasspath = envs.get("appClasspath");
+ String containerParticipantMainClass = envs.get("containerParticipantMainClass");
+ String zkAddress = envs.get(Environment.NM_HOST.name()) + ":2181";
+
+ // set the localresources needed to launch container
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+
+ LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
+ YarnConfiguration conf = new YarnConfiguration();
+ FileSystem fs;
+ fs = FileSystem.get(conf);
+ String pathSuffix = appName + "/" + appId + "/app-pkg.tar";
+ Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
+ FileStatus destStatus = fs.getFileStatus(dst);
+
+ // Set the type of resource - file or archive
+ // archives are untarred at destination
+ // we don't need the jar file to be untarred for now
+ amJarRsrc.setType(LocalResourceType.ARCHIVE);
+ // Set visibility of the resource
+ // Setting to most private option
+ amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+ // Set the resource to be copied over
+ amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst));
+ // Set timestamp and length of file so that the framework
+ // can do basic sanity checks for the local resource
+ // after it has been copied over to ensure it is the same
+ // resource the client intended to use with the application
+ amJarRsrc.setTimestamp(destStatus.getModificationTime());
+ amJarRsrc.setSize(destStatus.getLen());
+ localResources.put("app-pkg", amJarRsrc);
+
+ // Set local resource info into app master container launch context
+ amContainer.setLocalResources(localResources);
+
+ // Set the necessary security tokens as needed
+ // amContainer.setContainerTokens(containerToken);
+
+ // Set the env variables to be setup in the env where the application master will be run
+ LOG.info("Set the environment for the application master");
+ Map<String, String> env = new HashMap<String, String>();
+ env.put("app-pkg-path", dst.getName());
+ // Add AppMaster.jar location to classpath
+ // At some point we should not be required to add
+ // the hadoop specific classpaths to the env.
+ // It should be provided out of the box.
+ // For now setting all required classpaths including
+ // the classpath to "." for the application jar
+ StringBuilder classPathEnv =
+ new StringBuilder(Environment.CLASSPATH.$()).append(File.pathSeparatorChar).append("./*");
+ classPathEnv.append(File.pathSeparatorChar);
+ classPathEnv.append(appClasspath);
+
+ for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+ classPathEnv.append(File.pathSeparatorChar);
+ classPathEnv.append(c.trim());
+ }
+ classPathEnv.append(File.pathSeparatorChar).append("./log4j.properties");
+
+ // add the runtime classpath needed for tests to work
+ if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+ classPathEnv.append(':');
+ classPathEnv.append(System.getProperty("java.class.path"));
+ }
+ System.out.println("classoath" + classPathEnv.toString());
+ env.put("CLASSPATH", classPathEnv.toString());
+
+ amContainer.setEnvironment(env);
+
+ // Set the necessary command to execute the application master
+ Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+ // Set java executable command
+ LOG.info("Setting up app master command");
+ vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
+ // Set Xmx based on am memory size
+ vargs.add("-Xmx" + 1024 + "m");
+ // Set class name
+ vargs.add(containerParticipantMainClass);
+ // Set params for container participant
+ vargs.add("--zk_address " + zkAddress);
+ vargs.add("--participantId " + containerId.stringify());
+
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stderr");
+
+ // Get final commmand
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+
+ LOG.info("Completed setting up app master command " + command.toString());
+ List<String> commands = new ArrayList<String>();
+ commands.add(command.toString());
+ amContainer.setCommands(commands);
+ return amContainer;
+ }
+
@Override
- public boolean stopContainer(ContainerId containerId) {
+ public ListenableFuture<Boolean> stopContainer(final ContainerId containerId) {
Container container = allocatedContainersMap.get(containerId);
- Future<ContainerStopResponse> stopContainer = applicationMaster.stopContainer(container);
- try {
- ContainerStopResponse containerStopResponse = stopContainer.get();
- return true;
- } catch (Exception e) {
- LOG.error("Exception while stopping container containerId:" + containerId, e);
- }
- return false;
+ ListenableFuture<ContainerStopResponse> future = applicationMaster.stopContainer(container);
+ return Futures.transform(future, new Function<ContainerStopResponse, Boolean>() {
+ @Override
+ public Boolean apply(ContainerStopResponse response) {
+ return response != null;
+ }
+ }, service);
}
@Override
http://git-wip-us.apache.org/repos/asf/helix/blob/f282a300/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java
index 0000000,0000000..8427c14
new file mode 100644
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java
@@@ -1,0 -1,0 +1,46 @@@
++package org.apache.helix.provisioning.yarn;
++
++import org.apache.helix.api.id.ResourceId;
++import org.apache.helix.controller.provisioner.ProvisionerConfig;
++import org.apache.helix.controller.provisioner.ProvisionerRef;
++import org.apache.helix.controller.serializer.DefaultStringSerializer;
++import org.apache.helix.controller.serializer.StringSerializer;
++import org.apache.helix.integration.TestLocalContainerProvider.LocalProvisioner;
++import org.codehaus.jackson.annotate.JsonProperty;
++
++public class YarnProvisionerConfig implements ProvisionerConfig {
++
++ private ResourceId _resourceId;
++ private Class<? extends StringSerializer> _serializerClass;
++ private ProvisionerRef _provisionerRef;
++
++ public YarnProvisionerConfig(@JsonProperty("resourceId") ResourceId resourceId) {
++ _resourceId = resourceId;
++ _serializerClass = DefaultStringSerializer.class;
++ _provisionerRef = ProvisionerRef.from(YarnProvisioner.class.getName());
++ }
++
++ @Override
++ public ResourceId getResourceId() {
++ return _resourceId;
++ }
++
++ @Override
++ public ProvisionerRef getProvisionerRef() {
++ return _provisionerRef;
++ }
++
++ public void setProvisionerRef(ProvisionerRef provisionerRef) {
++ _provisionerRef = provisionerRef;
++ }
++
++ @Override
++ public Class<? extends StringSerializer> getSerializerClass() {
++ return _serializerClass;
++ }
++
++ public void setSerializerClass(Class<? extends StringSerializer> serializerClass) {
++ _serializerClass = serializerClass;
++ }
++
++}