You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2014/01/10 16:27:09 UTC

[2/2] git commit: resolving conflicts

resolving conflicts


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f282a300
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f282a300
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f282a300

Branch: refs/heads/helix-provisioning
Commit: f282a30039223a2068350cac12596b55c55dedca
Parents: 9a1ba91 27f6272
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Fri Jan 10 07:26:58 2014 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Fri Jan 10 07:26:58 2014 -0800

----------------------------------------------------------------------
 .../provisioner/ContainerProvider.java          |  10 +-
 .../controller/provisioner/ContainerState.java  |   4 +-
 .../stages/ContainerProvisioningStage.java      | 150 +++++++++++++------
 .../integration/TestLocalContainerProvider.java |  28 +++-
 .../provisioning/yarn/ContainerParticipant.java |  15 ++
 .../yarn/GenericApplicationMaster.java          |  27 ++--
 .../provisioning/yarn/YarnProvisioner.java      |  89 ++++++-----
 .../yarn/YarnProvisionerConfig.java             |  46 ++++++
 8 files changed, 254 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/f282a300/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
index 0000000,0000000..b0272e8
new file mode 100644
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
@@@ -1,0 -1,0 +1,15 @@@
++package org.apache.helix.provisioning.yarn;
++
++import java.util.Arrays;
++
++import org.apache.commons.logging.Log;
++import org.apache.commons.logging.LogFactory;
++
++public class ContainerParticipant {
++  private static final Log LOG = LogFactory.getLog(ContainerParticipant.class);
++
++  public static void main(String[] args) throws InterruptedException {
++    LOG.info("Starting participant: "+ Arrays.toString(args));
++    Thread.currentThread().join();
++  }
++}

http://git-wip-us.apache.org/repos/asf/helix/blob/f282a300/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
index 1b27378,3b4e937..3adffd6
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
@@@ -98,17 -93,20 +94,15 @@@ public class GenericApplicationMaster 
    // Tracking url to which app master publishes info for clients to monitor
    private String appMasterTrackingUrl = "";
  
- 
 -  // Counter for completed containers ( complete denotes successful or failed )
 -  AtomicInteger numCompletedContainers = new AtomicInteger();
 -  // Allocated container count so that we know how many containers has the RM
 -  // allocated to us
 -  AtomicInteger numAllocatedContainers = new AtomicInteger();
 -  // Count of failed containers
 -  AtomicInteger numFailedContainers = new AtomicInteger();
 -  // Count of containers already requested from the RM
 -  // Needed as once requested, we should not request for containers again.
 -  // Only request for more if the original requirement changes.
 -  AtomicInteger numRequestedContainers = new AtomicInteger();
    Map<ContainerRequest, SettableFuture<ContainerAskResponse>> containerRequestMap =
        new LinkedHashMap<AMRMClient.ContainerRequest, SettableFuture<ContainerAskResponse>>();
 +  Map<ContainerId, SettableFuture<ContainerReleaseResponse>> containerReleaseMap =
 +      new LinkedHashMap<ContainerId, SettableFuture<ContainerReleaseResponse>>();
 +  Map<ContainerId, SettableFuture<ContainerStopResponse>> containerStopMap =
 +      new LinkedHashMap<ContainerId, SettableFuture<ContainerStopResponse>>();
 +  Map<ContainerId, SettableFuture<ContainerLaunchResponse>> containerLaunchResponseMap =
 +      new LinkedHashMap<ContainerId, SettableFuture<ContainerLaunchResponse>>();
  
-   
    ByteBuffer allTokens;
  
    // Launch threads
@@@ -120,7 -117,7 +113,7 @@@
      // Set up the configuration
      conf = new YarnConfiguration();
    }
--  
++
    /**
     * Dump out contents of $CWD and the environment to stdout for debugging
     */
@@@ -154,15 -151,15 +147,13 @@@
      }
    }
  
--
--
    /**
     * Parse command line options
     * @param args Command line args
     * @return Whether init successful and run should be invoked
     * @throws ParseException
     * @throws IOException
--   * @throws YarnException 
++   * @throws YarnException
     */
    public boolean start() throws ParseException, IOException, YarnException {
  
@@@ -234,33 -231,30 +225,35 @@@
      return true;
    }
  
--
-   public Future<ContainerAskResponse> acquireContainer(ContainerRequest containerAsk) {
+   public ListenableFuture<ContainerAskResponse> acquireContainer(ContainerRequest containerAsk) {
+     amRMClient.addContainerRequest(containerAsk);
 -    numRequestedContainers.incrementAndGet();
      SettableFuture<ContainerAskResponse> future = SettableFuture.create();
 +    containerRequestMap.put(containerAsk, future);
 +    amRMClient.addContainerRequest(containerAsk);
      return future;
    }
  
-   public Future<ContainerStopResponse> stopContainer(Container container) {
+   public ListenableFuture<ContainerStopResponse> stopContainer(Container container) {
+     nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
      SettableFuture<ContainerStopResponse> future = SettableFuture.create();
 +    containerStopMap.put(container.getId(), future);
 +    nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
      return future;
    }
  
-   public Future<ContainerReleaseResponse> releaseContainer(Container container) {
+   public ListenableFuture<ContainerReleaseResponse> releaseContainer(Container container) {
+     amRMClient.releaseAssignedContainer(container.getId());
      SettableFuture<ContainerReleaseResponse> future = SettableFuture.create();
 +    containerReleaseMap.put(container.getId(), future);
 +    amRMClient.releaseAssignedContainer(container.getId());
      return future;
    }
  
-   public Future<ContainerLaunchResponse> launchContainer(Container container,
+   public ListenableFuture<ContainerLaunchResponse> launchContainer(Container container,
        ContainerLaunchContext containerLaunchContext) {
 -    nmClientAsync.startContainerAsync(container, containerLaunchContext);
      SettableFuture<ContainerLaunchResponse> future = SettableFuture.create();
 +    containerLaunchResponseMap.put(container.getId(), future);
 +    nmClientAsync.startContainerAsync(container, containerLaunchContext);
      return future;
    }
  

http://git-wip-us.apache.org/repos/asf/helix/blob/f282a300/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
index f983255,e921c87..f74e312
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
@@@ -1,38 -1,14 +1,39 @@@
  package org.apache.helix.provisioning.yarn;
  
 +import java.io.File;
 +import java.io.FileInputStream;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
  import java.util.Collection;
  import java.util.HashMap;
 +import java.util.List;
  import java.util.Map;
 +import java.util.Vector;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.Future;
+ import java.util.concurrent.Executors;
  
 +import org.apache.commons.compress.archivers.ArchiveStreamFactory;
 +import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.DataOutputBuffer;
 +import org.apache.hadoop.security.Credentials;
 +import org.apache.hadoop.security.UserGroupInformation;
 +import org.apache.hadoop.security.token.Token;
 +import org.apache.hadoop.yarn.api.ApplicationConstants;
 +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
  import org.apache.hadoop.yarn.api.records.Container;
  import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 +import org.apache.hadoop.yarn.api.records.LocalResource;
 +import org.apache.hadoop.yarn.api.records.LocalResourceType;
 +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
  import org.apache.hadoop.yarn.api.records.Priority;
  import org.apache.hadoop.yarn.api.records.Resource;
  import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
@@@ -50,178 -23,70 +51,190 @@@ import org.apache.helix.controller.prov
  import org.apache.helix.controller.provisioner.Provisioner;
  import org.apache.helix.controller.provisioner.TargetProviderResponse;
  
 +import com.google.common.collect.Lists;
+ import com.google.common.base.Function;
+ import com.google.common.util.concurrent.Futures;
+ import com.google.common.util.concurrent.ListenableFuture;
+ import com.google.common.util.concurrent.ListeningExecutorService;
+ import com.google.common.util.concurrent.MoreExecutors;
  
  public class YarnProvisioner implements Provisioner {
  
    private static final Log LOG = LogFactory.getLog(YarnProvisioner.class);
    static GenericApplicationMaster applicationMaster;
 -  static ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
++  static ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors
++      .newCachedThreadPool());
    Map<ContainerId, Container> allocatedContainersMap = new HashMap<ContainerId, Container>();
 +  int DEFAULT_CONTAINER = 4;
  
    @Override
-   public ContainerId allocateContainer(ContainerSpec spec) {
+   public ListenableFuture<ContainerId> allocateContainer(ContainerSpec spec) {
      ContainerRequest containerAsk = setupContainerAskForRM(spec);
-     Future<ContainerAskResponse> requestNewContainer =
+     ListenableFuture<ContainerAskResponse> requestNewContainer =
          applicationMaster.acquireContainer(containerAsk);
-     ContainerAskResponse containerAskResponse;
-     try {
-       containerAskResponse = requestNewContainer.get();
-       ContainerId helixContainerId =
-           ContainerId.from(containerAskResponse.getContainer().getId().toString());
-       allocatedContainersMap.put(helixContainerId, containerAskResponse.getContainer());
-       return helixContainerId;
-     } catch (Exception e) {
-       LOG.error("Exception in allocateContainer for spec:" + spec, e);
-     }
-     return null;
 -    return Futures.transform(requestNewContainer, new Function<ContainerAskResponse, ContainerId>() {
 -      @Override
 -      public ContainerId apply(ContainerAskResponse containerAskResponse) {
 -        ContainerId helixContainerId =
 -            ContainerId.from(containerAskResponse.getContainer().getId().toString());
 -        allocatedContainersMap.put(helixContainerId, containerAskResponse.getContainer());
 -        return helixContainerId;
 -      }
 -    });
++    return Futures.transform(requestNewContainer,
++        new Function<ContainerAskResponse, ContainerId>() {
++          @Override
++          public ContainerId apply(ContainerAskResponse containerAskResponse) {
++            ContainerId helixContainerId =
++                ContainerId.from(containerAskResponse.getContainer().getId().toString());
++            allocatedContainersMap.put(helixContainerId, containerAskResponse.getContainer());
++            return helixContainerId;
++          }
++        });
++
    }
  
    @Override
-   public boolean deallocateContainer(ContainerId containerId) {
-     Future<ContainerReleaseResponse> releaseContainer =
+   public ListenableFuture<Boolean> deallocateContainer(ContainerId containerId) {
+     ListenableFuture<ContainerReleaseResponse> releaseContainer =
          applicationMaster.releaseContainer(allocatedContainersMap.get(containerId));
-     try {
-       releaseContainer.get();
-       return true;
-     } catch (Exception e) {
-       LOG.error("Exception in deallocateContainer containerId:" + containerId, e);
-     }
-     return false;
+     return Futures.transform(releaseContainer, new Function<ContainerReleaseResponse, Boolean>() {
+       @Override
+       public Boolean apply(ContainerReleaseResponse response) {
+         return response != null;
+       }
+     }, service);
++
    }
  
    @Override
-   public boolean startContainer(ContainerId containerId) {
+   public ListenableFuture<Boolean> startContainer(final ContainerId containerId) {
      Container container = allocatedContainersMap.get(containerId);
 -    ContainerLaunchContext containerLaunchContext = Records.newRecord(ContainerLaunchContext.class);
 -    ListenableFuture<ContainerLaunchResponse> future = applicationMaster.launchContainer(container, containerLaunchContext);
++    ContainerLaunchContext launchContext;
 +    try {
-       Future<ContainerLaunchResponse> launchContainer =
-           applicationMaster.launchContainer(container, createLaunchContext(containerId));
-       ContainerLaunchResponse containerLaunchResponse = launchContainer.get();
-       return true;
++      launchContext = createLaunchContext(containerId);
 +    } catch (Exception e) {
-       LOG.error("Exception while starting container containerId:" + containerId, e);
++      LOG.error("Exception while creating context to launch container:" + containerId, e);
++      return null;
 +    }
-     return false;
++    ListenableFuture<ContainerLaunchResponse> future =
++        applicationMaster.launchContainer(container, launchContext);
+     return Futures.transform(future, new Function<ContainerLaunchResponse, Boolean>() {
+       @Override
+       public Boolean apply(ContainerLaunchResponse response) {
+         return response != null;
+       }
+     }, service);
    }
  
 +  private ContainerLaunchContext createLaunchContext(ContainerId containerId) throws Exception {
 +
 +    ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
 +
 +    Map<String, String> envs = System.getenv();
 +    String appName = envs.get("appName");
 +    String appId = envs.get("appId");
 +    String appClasspath = envs.get("appClasspath");
 +    String containerParticipantMainClass = envs.get("containerParticipantMainClass");
 +    String zkAddress = envs.get(Environment.NM_HOST.name()) + ":2181";
 +
 +    // set the localresources needed to launch container
 +    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
 +
 +    LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
 +    YarnConfiguration conf = new YarnConfiguration();
 +    FileSystem fs;
 +    fs = FileSystem.get(conf);
 +    String pathSuffix = appName + "/" + appId + "/app-pkg.tar";
 +    Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
 +    FileStatus destStatus = fs.getFileStatus(dst);
 +
 +    // Set the type of resource - file or archive
 +    // archives are untarred at destination
 +    // we don't need the jar file to be untarred for now
 +    amJarRsrc.setType(LocalResourceType.ARCHIVE);
 +    // Set visibility of the resource
 +    // Setting to most private option
 +    amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
 +    // Set the resource to be copied over
 +    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst));
 +    // Set timestamp and length of file so that the framework
 +    // can do basic sanity checks for the local resource
 +    // after it has been copied over to ensure it is the same
 +    // resource the client intended to use with the application
 +    amJarRsrc.setTimestamp(destStatus.getModificationTime());
 +    amJarRsrc.setSize(destStatus.getLen());
 +    localResources.put("app-pkg", amJarRsrc);
 +
 +    // Set local resource info into app master container launch context
 +    amContainer.setLocalResources(localResources);
 +
 +    // Set the necessary security tokens as needed
 +    // amContainer.setContainerTokens(containerToken);
 +
 +    // Set the env variables to be setup in the env where the application master will be run
 +    LOG.info("Set the environment for the application master");
 +    Map<String, String> env = new HashMap<String, String>();
 +    env.put("app-pkg-path", dst.getName());
 +    // Add AppMaster.jar location to classpath
 +    // At some point we should not be required to add
 +    // the hadoop specific classpaths to the env.
 +    // It should be provided out of the box.
 +    // For now setting all required classpaths including
 +    // the classpath to "." for the application jar
 +    StringBuilder classPathEnv =
 +        new StringBuilder(Environment.CLASSPATH.$()).append(File.pathSeparatorChar).append("./*");
 +    classPathEnv.append(File.pathSeparatorChar);
 +    classPathEnv.append(appClasspath);
 +
 +    for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
 +        YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
 +      classPathEnv.append(File.pathSeparatorChar);
 +      classPathEnv.append(c.trim());
 +    }
 +    classPathEnv.append(File.pathSeparatorChar).append("./log4j.properties");
 +
 +    // add the runtime classpath needed for tests to work
 +    if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
 +      classPathEnv.append(':');
 +      classPathEnv.append(System.getProperty("java.class.path"));
 +    }
 +    System.out.println("classoath" + classPathEnv.toString());
 +    env.put("CLASSPATH", classPathEnv.toString());
 +
 +    amContainer.setEnvironment(env);
 +
 +    // Set the necessary command to execute the application master
 +    Vector<CharSequence> vargs = new Vector<CharSequence>(30);
 +
 +    // Set java executable command
 +    LOG.info("Setting up app master command");
 +    vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
 +    // Set Xmx based on am memory size
 +    vargs.add("-Xmx" + 1024 + "m");
 +    // Set class name
 +    vargs.add(containerParticipantMainClass);
 +    // Set params for container participant
 +    vargs.add("--zk_address " + zkAddress);
 +    vargs.add("--participantId " + containerId.stringify());
 +
 +    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stdout");
 +    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stderr");
 +
 +    // Get final commmand
 +    StringBuilder command = new StringBuilder();
 +    for (CharSequence str : vargs) {
 +      command.append(str).append(" ");
 +    }
 +
 +    LOG.info("Completed setting up app master command " + command.toString());
 +    List<String> commands = new ArrayList<String>();
 +    commands.add(command.toString());
 +    amContainer.setCommands(commands);
 +    return amContainer;
 +  }
 +
    @Override
-   public boolean stopContainer(ContainerId containerId) {
+   public ListenableFuture<Boolean> stopContainer(final ContainerId containerId) {
      Container container = allocatedContainersMap.get(containerId);
-     Future<ContainerStopResponse> stopContainer = applicationMaster.stopContainer(container);
-     try {
-       ContainerStopResponse containerStopResponse = stopContainer.get();
-       return true;
-     } catch (Exception e) {
-       LOG.error("Exception while stopping container containerId:" + containerId, e);
-     }
-     return false;
+     ListenableFuture<ContainerStopResponse> future = applicationMaster.stopContainer(container);
+     return Futures.transform(future, new Function<ContainerStopResponse, Boolean>() {
+       @Override
+       public Boolean apply(ContainerStopResponse response) {
+         return response != null;
+       }
+     }, service);
    }
  
    @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/f282a300/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java
index 0000000,0000000..8427c14
new file mode 100644
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java
@@@ -1,0 -1,0 +1,46 @@@
++package org.apache.helix.provisioning.yarn;
++
++import org.apache.helix.api.id.ResourceId;
++import org.apache.helix.controller.provisioner.ProvisionerConfig;
++import org.apache.helix.controller.provisioner.ProvisionerRef;
++import org.apache.helix.controller.serializer.DefaultStringSerializer;
++import org.apache.helix.controller.serializer.StringSerializer;
++import org.apache.helix.integration.TestLocalContainerProvider.LocalProvisioner;
++import org.codehaus.jackson.annotate.JsonProperty;
++
++public class YarnProvisionerConfig implements ProvisionerConfig {
++
++  private ResourceId _resourceId;
++  private Class<? extends StringSerializer> _serializerClass;
++  private ProvisionerRef _provisionerRef;
++
++  public YarnProvisionerConfig(@JsonProperty("resourceId") ResourceId resourceId) {
++    _resourceId = resourceId;
++    _serializerClass = DefaultStringSerializer.class;
++    _provisionerRef = ProvisionerRef.from(YarnProvisioner.class.getName());
++  }
++
++  @Override
++  public ResourceId getResourceId() {
++    return _resourceId;
++  }
++
++  @Override
++  public ProvisionerRef getProvisionerRef() {
++    return _provisionerRef;
++  }
++
++  public void setProvisionerRef(ProvisionerRef provisionerRef) {
++    _provisionerRef = provisionerRef;
++  }
++
++  @Override
++  public Class<? extends StringSerializer> getSerializerClass() {
++    return _serializerClass;
++  }
++
++  public void setSerializerClass(Class<? extends StringSerializer> serializerClass) {
++    _serializerClass = serializerClass;
++  }
++
++}