You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/09/20 20:30:20 UTC
[11/15] Adding Helix-task-framework and Yarn integration modules
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProvider.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProvider.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProvider.java
new file mode 100644
index 0000000..d490edc
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProvider.java
@@ -0,0 +1,143 @@
+package org.apache.helix.autoscale.impl.yarn;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.helix.autoscale.ContainerProvider;
+import org.apache.helix.autoscale.ContainerProviderService;
+import org.apache.helix.autoscale.impl.yarn.YarnContainerData.ContainerState;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * {@link ContainerProvider} spawning YARN-based containers. Reads and writes
+ * meta data using {@link YarnDataProvider}. Works in a distributed setting, but
+ * typically requires access to zookeeper.
+ *
+ */
+class YarnContainerProvider implements ContainerProviderService {
+
+ static final Logger log = Logger.getLogger(YarnContainerProvider.class);
+
+ static final long POLL_INTERVAL = 1000;
+ static final long CONTAINER_TIMEOUT = 60000;
+
+ /*
+ * CONTAINERS
+ * A (A, READY)
+ * B (B, RUNNING)
+ */
+
+ final Object notifier = new Object();
+ final Map<String, Properties> types = new HashMap<String, Properties>();
+
+ ZookeeperYarnDataProvider yarnDataService;
+ YarnContainerProviderProcess yarnApp;
+ YarnContainerProviderProperties properties;
+
+ @Override
+ public void configure(Properties properties) throws Exception {
+ YarnContainerProviderProperties yarnProps = new YarnContainerProviderProperties();
+ yarnProps.putAll(properties);
+ configure(yarnProps);
+ }
+
+ private void configure(YarnContainerProviderProperties properties) {
+ this.properties = properties;
+
+ for(String containerType : properties.getContainers()) {
+ registerType(containerType, properties.getContainer(containerType));
+ }
+ }
+
+ @Override
+ public void start() throws Exception {
+ Preconditions.checkNotNull(properties);
+ Preconditions.checkState(properties.isValid(), "provider properties not valid: %s", properties);
+
+ log.debug("Starting yarn container provider service");
+ yarnDataService = new ZookeeperYarnDataProvider();
+ yarnDataService.configure(properties);
+ yarnDataService.start();
+ }
+
+ @Override
+ public void stop() throws Exception {
+ log.debug("Stopping yarn container provider service");
+ destroyAll();
+
+ if(yarnDataService != null) {
+ yarnDataService.stop();
+ yarnDataService = null;
+ }
+ }
+
+ @Override
+ public void create(final String id, final String type) throws Exception {
+ Preconditions.checkArgument(types.containsKey(type), "Container type '%s' is not configured", type);
+
+ YarnContainerProcessProperties containerProperties = YarnUtils.createContainerProcessProperties(types.get(type));
+
+ log.info(String.format("Running container '%s' (properties='%s')", id, containerProperties));
+
+ yarnDataService.create(new YarnContainerData(id, properties.getName(), containerProperties));
+ waitForState(id, ContainerState.ACTIVE);
+ }
+
+ @Override
+ public void destroy(final String id) throws Exception {
+ YarnContainerData meta = yarnDataService.read(id);
+
+ if(meta.state == ContainerState.ACTIVE) {
+ log.info(String.format("Destroying active container, going to teardown"));
+ yarnDataService.update(meta.setState(ContainerState.TEARDOWN));
+
+ } else if(meta.state == ContainerState.FAILED) {
+ log.info(String.format("Destroying failed container, going to teardown"));
+ yarnDataService.update(meta.setState(ContainerState.TEARDOWN));
+
+ } else if(meta.state == ContainerState.FINALIZE) {
+ log.info(String.format("Destroying finalized container, skipping"));
+
+ } else {
+ throw new IllegalStateException(String.format("Container '%s' must be active, failed or finalized", id));
+ }
+
+ waitForState(id, ContainerState.FINALIZE);
+ yarnDataService.delete(id);
+ }
+
+ @Override
+ public void destroyAll() {
+ try {
+ for(YarnContainerData meta : yarnDataService.readAll()) {
+ if(meta.owner.equals(properties.getName())) {
+ try { destroy(meta.id); } catch (Exception ignore) {}
+ }
+ }
+ } catch (Exception ignore) {
+ // ignore
+ }
+ }
+
+ void waitForState(String id, ContainerState state) throws Exception, InterruptedException, TimeoutException {
+ long limit = System.currentTimeMillis() + CONTAINER_TIMEOUT;
+ YarnContainerData meta = yarnDataService.read(id);
+ while(meta.state != state) {
+ if(System.currentTimeMillis() >= limit) {
+ throw new TimeoutException(String.format("Container '%s' failed to reach state '%s' (currently is '%s')", id, state, meta.state));
+ }
+ Thread.sleep(POLL_INTERVAL);
+ meta = yarnDataService.read(id);
+ }
+ }
+
+ void registerType(String name, Properties properties) {
+ log.debug(String.format("Registering container type '%s' (properties='%s')", name, properties));
+ types.put(name, properties);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProviderProcess.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProviderProcess.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProviderProcess.java
new file mode 100644
index 0000000..20a8b92
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProviderProcess.java
@@ -0,0 +1,158 @@
+package org.apache.helix.autoscale.impl.yarn;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.helix.autoscale.Service;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Configurable and runnable service for {@link YarnContainerProvider}
+ *
+ */
+public class YarnContainerProviderProcess implements Service {
+
+ static final Logger log = Logger.getLogger(YarnContainerProviderProcess.class);
+
+ static String YARN_MASTER_COMMAND = "/bin/sh %s 1>%s/stdout 2>%s/stderr";
+
+ Configuration conf;
+ YarnRPC rpc;
+ ClientRMProtocol rmClient;
+ ApplicationId appId;
+ File propertiesFile;
+
+ YarnContainerProviderProperties properties;
+
+ @Override
+ public void configure(Properties properties) throws Exception {
+ configure(YarnUtils.createContainerProviderProperties(properties));
+ }
+
+ private void configure(YarnContainerProviderProperties properties) {
+ this.conf = new YarnConfiguration();
+ this.conf.set(YarnConfiguration.RM_ADDRESS, properties.getResourceManager());
+ this.conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, properties.getScheduler());
+ this.conf.set(FileSystem.FS_DEFAULT_NAME_KEY, properties.getHdfs());
+
+ this.rpc = YarnRPC.create(conf);
+
+ this.properties = properties;
+ }
+
+ @Override
+ public void start() throws Exception {
+ Preconditions.checkNotNull(properties);
+ Preconditions.checkState(properties.isValid());
+
+ connect();
+
+ String command = String.format(YARN_MASTER_COMMAND, YarnUtils.YARN_MASTER_PATH, ApplicationConstants.LOG_DIR_EXPANSION_VAR,
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+
+ log.info(String.format("Starting application '%s' provider '%s' (masterCommand='%s')", properties.getYarnData(), properties.getName(), command));
+
+ log.debug(String.format("Running master command \"%s\"", command));
+
+ // app id
+ GetNewApplicationRequest appRequest = Records.newRecord(GetNewApplicationRequest.class);
+ GetNewApplicationResponse appResponse = rmClient.getNewApplication(appRequest);
+
+ this.appId = appResponse.getApplicationId();
+
+ log.info(String.format("Acquired app id '%s' for '%s'", appId.toString(), properties.getName()));
+
+ // command
+ ContainerLaunchContext launchContext = Records.newRecord(ContainerLaunchContext.class);
+ launchContext.setCommands(Collections.singletonList(command));
+
+ // resource limit
+ Resource resource = Records.newRecord(Resource.class);
+ resource.setMemory(256); // TODO make dynamic
+ launchContext.setResource(resource);
+
+ // environment
+ Map<String, String> env = new HashMap<String, String>();
+ launchContext.setEnvironment(env);
+
+ // configuration
+ propertiesFile = YarnUtils.writePropertiesToTemp(properties);
+
+ // HDFS
+ final String namespace = appId.toString();
+ final Path masterArchive = YarnUtils.copyToHdfs(YarnUtils.YARN_MASTER_ARCHIVE_PATH, YarnUtils.YARN_MASTER_STAGING, namespace, conf);
+ final Path masterProperties = YarnUtils.copyToHdfs(propertiesFile.getCanonicalPath(), YarnUtils.YARN_MASTER_PROPERTIES, namespace, conf);
+ final Path containerArchive = YarnUtils.copyToHdfs(YarnUtils.YARN_CONTAINER_ARCHIVE_PATH, YarnUtils.YARN_CONTAINER_STAGING, namespace, conf);
+
+ // local resources
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+ localResources.put(YarnUtils.YARN_MASTER_DESTINATION, YarnUtils.createHdfsResource(masterArchive, LocalResourceType.ARCHIVE, conf));
+ localResources.put(YarnUtils.YARN_MASTER_PROPERTIES, YarnUtils.createHdfsResource(masterProperties, LocalResourceType.FILE, conf));
+ localResources.put(YarnUtils.YARN_CONTAINER_STAGING, YarnUtils.createHdfsResource(containerArchive, LocalResourceType.FILE, conf));
+
+ launchContext.setLocalResources(localResources);
+
+ // user
+ launchContext.setUser(properties.getUser());
+
+ // app submission
+ ApplicationSubmissionContext subContext = Records.newRecord(ApplicationSubmissionContext.class);
+ subContext.setApplicationId(appId);
+ subContext.setApplicationName(properties.getName());
+ subContext.setAMContainerSpec(launchContext);
+
+ SubmitApplicationRequest subRequest = Records.newRecord(SubmitApplicationRequest.class);
+ subRequest.setApplicationSubmissionContext(subContext);
+
+ log.info(String.format("Starting app id '%s'", appId.toString()));
+
+ rmClient.submitApplication(subRequest);
+
+ }
+
+ @Override
+ public void stop() throws YarnRemoteException {
+ log.info(String.format("Stopping app id '%s'", appId.toString()));
+ KillApplicationRequest killRequest = Records.newRecord(KillApplicationRequest.class);
+ killRequest.setApplicationId(appId);
+
+ rmClient.forceKillApplication(killRequest);
+
+ try { YarnUtils.destroyHdfsNamespace(appId.toString(), conf); } catch(Exception ignore) {}
+
+ propertiesFile.delete();
+ }
+
+ void connect() {
+ YarnConfiguration yarnConf = new YarnConfiguration(conf);
+ InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS));
+ log.info("Connecting to ResourceManager at: " + rmAddress);
+ this.rmClient = ((ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, rmAddress, conf));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProviderProperties.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProviderProperties.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProviderProperties.java
new file mode 100644
index 0000000..85c8ab5
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProviderProperties.java
@@ -0,0 +1,64 @@
+package org.apache.helix.autoscale.impl.yarn;
+
+import org.apache.helix.autoscale.provider.ProviderProperties;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Base configuration for {@link YarnContainerProviderProcess}
+ *
+ */
+public class YarnContainerProviderProperties extends ProviderProperties {
+ /**
+ *
+ */
+ private static final long serialVersionUID = -8853614843205587170L;
+
+ public final static String YARNDATA = "yarndata";
+ public final static String RESOURCEMANAGER = "resourcemananger";
+ public final static String SCHEDULER = "scheduler";
+ public final static String USER = "user";
+ public final static String HDFS = "hdfs";
+
+ public boolean isValid() {
+ return super.isValid() &&
+ containsKey(YARNDATA) &&
+ containsKey(RESOURCEMANAGER) &&
+ containsKey(SCHEDULER) &&
+ containsKey(USER) &&
+ containsKey(HDFS);
+ }
+
+ public String getYarnData() {
+ return getProperty(YARNDATA);
+ }
+
+ public String getResourceManager() {
+ return getProperty(RESOURCEMANAGER);
+ }
+
+ public String getScheduler() {
+ return getProperty(SCHEDULER);
+ }
+
+ public String getUser() {
+ return getProperty(USER);
+ }
+
+ public String getHdfs() {
+ return getProperty(HDFS);
+ }
+
+ @Override
+ public String getProperty(String key) {
+ Preconditions.checkState(containsKey(key));
+ return super.getProperty(key);
+ }
+
+ @Override
+ public Object get(Object key) {
+ Preconditions.checkState(containsKey(key));
+ return super.get(key);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerService.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerService.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerService.java
new file mode 100644
index 0000000..e730c25
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerService.java
@@ -0,0 +1,156 @@
+package org.apache.helix.autoscale.impl.yarn;
+
+import java.io.File;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.autoscale.Service;
+import org.apache.helix.autoscale.container.ContainerProcess;
+import org.apache.helix.autoscale.container.ContainerProcessProperties;
+import org.apache.helix.autoscale.container.ContainerUtils;
+import org.apache.helix.autoscale.impl.yarn.YarnContainerData.ContainerState;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Configurable and runnable service for YARN-based containers. Continuously
+ * checks container meta data and process state and triggers state changes and
+ * container setup and shutdown.
+ *
+ */
+class YarnContainerService implements Service {
+ static final Logger log = Logger.getLogger(YarnContainerService.class);
+
+ static final long CONTAINERSERVICE_INTERVAL = 1000;
+
+ YarnContainerProcessProperties properties;
+
+ YarnDataProvider metaService;
+ ScheduledExecutorService executor;
+
+ ContainerProcess process;
+
+ @Override
+ public void configure(Properties properties) throws Exception {
+ Preconditions.checkNotNull(properties);
+ YarnContainerProcessProperties containerProperties = new YarnContainerProcessProperties();
+ containerProperties.putAll(properties);
+ Preconditions.checkArgument(containerProperties.isValid());
+
+ this.properties = containerProperties;
+ }
+
+ public void setYarnDataProvider(YarnDataProvider metaService) {
+ this.metaService = metaService;
+ }
+
+ @Override
+ public void start() {
+ Preconditions.checkNotNull(metaService);
+ Preconditions.checkNotNull(properties);
+ Preconditions.checkState(properties.isValid());
+
+ log.debug("starting yarn container service");
+
+ executor = Executors.newSingleThreadScheduledExecutor();
+ executor.scheduleAtFixedRate(new ContainerStatusService(), 0, CONTAINERSERVICE_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void stop() {
+ log.debug("stopping yarn container service");
+
+ if (executor != null) {
+ executor.shutdown();
+ while (!executor.isTerminated()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ executor = null;
+ }
+
+ destroyLocalContainerNamespace();
+ }
+
+ class ContainerStatusService implements Runnable {
+ @Override
+ public void run() {
+ log.info("updating container status");
+
+ try {
+ if (!metaService.exists(properties.getName())) {
+ log.warn(String.format("YarnData for '%s' does not exist. Terminating yarn service.", properties.getName()));
+ process.stop();
+ stop();
+ }
+
+ YarnContainerData meta = metaService.read(properties.getName());
+
+ if (meta.state == ContainerState.CONNECTING) {
+ log.trace("container connecting");
+ try {
+ ContainerProcessProperties containerProperties = meta.getProperties();
+
+ containerProperties.setProperty(ContainerProcessProperties.CLUSTER, properties.getCluster());
+ containerProperties.setProperty(ContainerProcessProperties.ADDRESS, properties.getAddress());
+ containerProperties.setProperty(ContainerProcessProperties.NAME, properties.getName());
+
+ process = ContainerUtils.createProcess(containerProperties);
+ process.start();
+ } catch (Exception e) {
+ log.error("Failed to start participant, going to failed", e);
+ }
+
+ if (process.isActive()) {
+ log.trace("process active, activating container");
+ metaService.update(meta.setState(ContainerState.ACTIVE));
+
+ } else if (process.isFailed()) {
+ log.trace("process failed, failing container");
+ metaService.update(meta.setState(ContainerState.FAILED));
+
+ } else {
+ log.trace("process state unknown, failing container");
+ metaService.update(meta.setState(ContainerState.FAILED));
+ }
+ }
+
+ if (meta.state == ContainerState.ACTIVE) {
+ log.trace("container active");
+ if (process.isFailed()) {
+ log.trace("process failed, failing container");
+ metaService.update(meta.setState(ContainerState.FAILED));
+
+ } else if (!process.isActive()) {
+ log.trace("process not active, halting container");
+ process.stop();
+ metaService.update(meta.setState(ContainerState.HALTED));
+ }
+ }
+
+ if (meta.state == ContainerState.TEARDOWN) {
+ log.trace("container teardown");
+ process.stop();
+ metaService.update(meta.setState(ContainerState.HALTED));
+ }
+
+ } catch (Exception e) {
+ log.error(String.format("Error while updating container '%s' status", properties.getName()), e);
+ }
+ }
+ }
+
+ public static void destroyLocalContainerNamespace() {
+ log.info("cleaning up container directory");
+ FileUtils.deleteQuietly(new File(YarnUtils.YARN_CONTAINER_DESTINATION));
+ FileUtils.deleteQuietly(new File(YarnUtils.YARN_CONTAINER_PROPERTIES));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnDataProvider.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnDataProvider.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnDataProvider.java
new file mode 100644
index 0000000..188045d
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnDataProvider.java
@@ -0,0 +1,73 @@
+package org.apache.helix.autoscale.impl.yarn;
+
+import java.util.Collection;
+
+/**
+ * Abstraction for a (remote) repository of yarn container meta data. Meta data
+ * is read and updated by {@link YarnContainerProvider}
+ * {@link YarnMasterProcess}, {@link YarnContainerProcess}.<br/>
+ * <b>NOTE:</b> Each operation is assumed to be atomic.
+ *
+ */
+interface YarnDataProvider {
+
+ /**
+ * Checks for existence of meta data about container insatnce
+ *
+ * @param id
+ * unique container id
+ * @return true, if meta data exists
+ */
+ public boolean exists(String id);
+
+ /**
+ * Create meta data entry. Check for non-existence of meta data for given
+ * container id and create node.
+ *
+ * @param data
+ * container meta data with unique id
+ * @throws Exception
+ * if meta data entry already exist
+ */
+ public void create(YarnContainerData data) throws Exception;
+
+ /**
+ * Read meta data for given container id.
+ *
+ * @param id
+ * unique container id
+ * @return yarn container data
+ * @throws Exception
+ * if meta data entry for given id does not exist
+ */
+ public YarnContainerData read(String id) throws Exception;
+
+ /**
+ * Read all meta data stored for this domain space of yarn providers and
+ * containers.
+ *
+ * @return collection of meta data entries, empty if none
+ * @throws Exception
+ */
+ public Collection<YarnContainerData> readAll() throws Exception;
+
+ /**
+ * Write meta data entry.
+ *
+ * @param data
+ * yarn container meta data
+ * @throws Exception
+ * if meta data entry for given id does not exist
+ */
+ public void update(YarnContainerData data) throws Exception;
+
+ /**
+ * Delete meta data entry. Frees up unique id to be reused. May throw an
+ * exception on non-existence or be idempotent.
+ *
+ * @param id
+ * unique container id
+ * @throws Exception
+ */
+ public void delete(String id) throws Exception;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnMasterProcess.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnMasterProcess.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnMasterProcess.java
new file mode 100644
index 0000000..25b73f5
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnMasterProcess.java
@@ -0,0 +1,144 @@
+package org.apache.helix.autoscale.impl.yarn;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.helix.autoscale.provider.ProviderProcess;
+import org.apache.log4j.Logger;
+
+/**
+ * Host process for {@link YarnContainerProviderProcess}. Hasts application
+ * master in YARN and provider participant to Helix meta cluster. (Program entry
+ * point)
+ *
+ */
+class YarnMasterProcess {
+
+ static final Logger log = Logger.getLogger(YarnMasterProcess.class);
+
+ public static void main(String[] args) throws Exception {
+ log.trace("BEGIN YarnMaster.main()");
+
+ final ApplicationAttemptId appAttemptId = getApplicationAttemptId();
+ log.info(String.format("Got application attempt id '%s'", appAttemptId.toString()));
+
+ log.debug("Reading master properties");
+ YarnMasterProperties properties = YarnUtils.createMasterProperties(YarnUtils.getPropertiesFromPath(YarnUtils.YARN_MASTER_PROPERTIES));
+
+ if (!properties.isValid())
+ throw new IllegalArgumentException(String.format("master properties not valid: %s", properties.toString()));
+
+ log.debug("Connecting to resource manager");
+ Configuration conf = new YarnConfiguration();
+ conf.set(YarnConfiguration.RM_ADDRESS, properties.getResourceManager());
+ conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, properties.getScheduler());
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, properties.getHdfs());
+
+ final AMRMProtocol resourceManager = getResourceManager(conf);
+
+ // register the AM with the RM
+ log.debug("Registering application master");
+ RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class);
+ appMasterRequest.setApplicationAttemptId(appAttemptId);
+ appMasterRequest.setHost("");
+ appMasterRequest.setRpcPort(0);
+ appMasterRequest.setTrackingUrl("");
+
+ resourceManager.registerApplicationMaster(appMasterRequest);
+
+ log.debug("Starting yarndata service");
+ final ZookeeperYarnDataProvider yarnDataService = new ZookeeperYarnDataProvider(properties.getYarnData());
+ yarnDataService.start();
+
+ log.debug("Starting yarn master service");
+ final YarnMasterService service = new YarnMasterService();
+ service.configure(properties);
+ service.setAttemptId(appAttemptId);
+ service.setYarnDataProvider(yarnDataService);
+ service.setProtocol(resourceManager);
+ service.setYarnConfiguration(conf);
+ service.start();
+
+ log.debug("Starting provider");
+ final YarnContainerProvider provider = new YarnContainerProvider();
+ provider.configure(properties);
+ provider.start();
+
+ log.debug("Starting provider process");
+ final ProviderProcess process = new ProviderProcess();
+ process.configure(properties);
+ process.setConteinerProvider(provider);
+ process.start();
+
+ log.debug("Installing shutdown hooks");
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ log.debug("Stopping provider process");
+ process.stop();
+
+ log.debug("Stopping provider");
+ try { provider.stop(); } catch (Exception ignore) {}
+
+ log.debug("Stopping yarn master service");
+ service.stop();
+
+ log.debug("Stopping yarndata service");
+ yarnDataService.stop();
+
+ // finish application
+ log.debug("Sending finish request");
+ FinishApplicationMasterRequest finishReq = Records.newRecord(FinishApplicationMasterRequest.class);
+
+ finishReq.setAppAttemptId(getApplicationAttemptId());
+ finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+
+ try { resourceManager.finishApplicationMaster(finishReq); } catch(Exception ignore) {}
+ }
+ }));
+
+ log.trace("END YarnMaster.main()");
+ }
+
+ static AMRMProtocol getResourceManager(Configuration conf) {
+ // Connect to the Scheduler of the ResourceManager.
+ YarnConfiguration yarnConf = new YarnConfiguration(conf);
+ YarnRPC rpc = YarnRPC.create(yarnConf);
+ InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
+ log.info("Connecting to ResourceManager at " + rmAddress);
+ AMRMProtocol resourceManager = (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf);
+ return resourceManager;
+ }
+
+ static ApplicationAttemptId getApplicationAttemptId() {
+ ContainerId containerId = ConverterUtils.toContainerId(getEnv(ApplicationConstants.AM_CONTAINER_ID_ENV));
+ ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
+ return appAttemptID;
+ }
+
+ static String getEnv(String key) {
+ Map<String, String> envs = System.getenv();
+ String clusterName = envs.get(key);
+ if (clusterName == null) {
+ // container id should always be set in the env by the framework
+ throw new IllegalArgumentException(String.format("%s not set in the environment", key));
+ }
+ return clusterName;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnMasterProperties.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnMasterProperties.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnMasterProperties.java
new file mode 100644
index 0000000..3f49852
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnMasterProperties.java
@@ -0,0 +1,13 @@
+package org.apache.helix.autoscale.impl.yarn;
+
+/**
+ * Base configuration for {@link YarnMasterProcess}.
+ *
+ */
+public class YarnMasterProperties extends YarnContainerProviderProperties {
+ /**
+ *
+ */
+ private static final long serialVersionUID = -2209509980239674160L;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnMasterService.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnMasterService.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnMasterService.java
new file mode 100644
index 0000000..03d4f72
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnMasterService.java
@@ -0,0 +1,414 @@
+package org.apache.helix.autoscale.impl.yarn;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.helix.autoscale.Service;
+import org.apache.helix.autoscale.impl.yarn.YarnContainerData.ContainerState;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+/**
+ * Implements YARN application master. Continuously monitors container health in
+ * YARN and yarn meta data updates. Spawns and destroys containers.
+ *
+ */
+class YarnMasterService implements Service {
+
+ static final Logger log = Logger.getLogger(YarnMasterService.class);
+
+ static final String REQUIRED_TYPE = "container";
+
+ static final long ZOOKEEPER_TIMEOUT = 5000;
+ static final long MASTERSERVICE_INTERVAL = 1000;
+
+ static final String CONTAINERS = "CONTAINERS";
+
+ static final String YARN_CONTAINER_COMMAND = "/bin/sh %s 1>%s/stdout 2>%s/stderr";
+
+ YarnMasterProperties properties;
+ AMRMProtocol protocol;
+ ApplicationAttemptId attemptId;
+ Configuration yarnConfig;
+ YarnDataProvider yarnDataService;
+
+ final Map<ContainerId, Container> unassignedContainers = new HashMap<ContainerId, Container>();
+ final Map<ContainerId, Container> activeContainers = new HashMap<ContainerId, Container>();
+ final Map<ContainerId, ContainerStatus> completedContainers = new HashMap<ContainerId, ContainerStatus>();
+ final Map<ContainerId, String> yarn2meta = new HashMap<ContainerId, String>();
+
+ ScheduledExecutorService executor;
+
+ @Override
+ public void configure(Properties properties) throws Exception {
+ YarnMasterProperties yarnProperties = YarnUtils.createMasterProperties(properties);
+ Preconditions.checkArgument(yarnProperties.isValid());
+ this.properties = yarnProperties;
+ }
+
+ public void setProtocol(AMRMProtocol protocol) {
+ this.protocol = protocol;
+ }
+
+ public void setAttemptId(ApplicationAttemptId attemptId) {
+ this.attemptId = attemptId;
+ }
+
+ public void setYarnConfiguration(Configuration yarnConfig) {
+ this.yarnConfig = yarnConfig;
+ }
+
+ public void setYarnDataProvider(YarnDataProvider yarnDataService) {
+ this.yarnDataService = yarnDataService;
+ }
+
+ @Override
+ public void start() {
+ Preconditions.checkNotNull(properties);
+ Preconditions.checkNotNull(protocol);
+ Preconditions.checkNotNull(attemptId);
+ Preconditions.checkNotNull(yarnConfig);
+ Preconditions.checkNotNull(yarnDataService);
+
+ log.debug("starting yarn master service");
+
+ executor = Executors.newSingleThreadScheduledExecutor();
+ executor.scheduleAtFixedRate(new YarnService(), 0, MASTERSERVICE_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void stop() {
+ log.debug("stopping yarn master service");
+
+ if (executor != null) {
+ executor.shutdown();
+ while (!executor.isTerminated()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ executor = null;
+ }
+
+ destroyLocalMasterNamespace();
+ }
+
+ Collection<YarnContainerData> readOwnedYarnData() throws Exception {
+ log.debug("reading container data");
+
+ Collection<YarnContainerData> containers = new ArrayList<YarnContainerData>();
+ for (YarnContainerData meta : yarnDataService.readAll()) {
+ if (meta.owner.equals(properties.getName())) {
+ containers.add(meta);
+ log.debug(String.format("found container node '%s' (state=%s, yarnId=%s, owner=%s)", meta.id, meta.state, meta.yarnId, meta.owner));
+ }
+ }
+ return containers;
+ }
+
+ class YarnService implements Runnable {
+ int responseId = 0;
+
+ @Override
+ public void run() {
+ try {
+ log.debug("running yarn service update cycle");
+
+ Collection<YarnContainerData> yarndata = readOwnedYarnData();
+
+ // active meta containers
+ int numMetaActive = countActiveMeta(yarndata);
+
+ // newly acquired meta containers
+ int numMetaAcquire = countAcquireMeta(yarndata);
+
+ // destroyed meta containers
+ List<ContainerId> destroyedReleasedIds = createDestroyedReleaseList(yarndata);
+ int numMetaCompleted = destroyedReleasedIds.size();
+
+ int numMeta = numMetaAcquire + numMetaActive + numMetaCompleted;
+
+ // yarn containers
+ int numYarnUnassigned = unassignedContainers.size();
+ int numYarnActive = activeContainers.size();
+ int numYarnCompleted = completedContainers.size();
+ int numYarn = numYarnUnassigned + numYarnActive + numYarnCompleted;
+
+ int numYarnRequired = numMetaAcquire - numYarnUnassigned;
+
+ // additionally required containers
+ int numRequestAdditional = Math.max(0, numYarnRequired);
+
+ // overstock containers
+ List<ContainerId> unneededReleasedIds = createOverstockReleaseList(numYarnRequired);
+
+ int numReleased = destroyedReleasedIds.size() + unneededReleasedIds.size();
+
+ log.debug(String.format("meta containers (total=%d, acquire=%d, active=%d, completed=%d)", numMeta, numMetaAcquire, numMetaActive, numMetaCompleted));
+ log.debug(String.format("yarn containers (total=%d, unassigned=%d, active=%d, completed=%d)", numYarn, numYarnUnassigned, numYarnActive, numYarnCompleted));
+ log.debug(String.format("requesting %d new containers, releasing %d", numRequestAdditional, numReleased));
+
+ Priority priority = Records.newRecord(Priority.class);
+ priority.setPriority(0);
+
+ Resource resource = Records.newRecord(Resource.class);
+ resource.setMemory(256); // TODO make dynamic
+
+ ResourceRequest resourceRequest = Records.newRecord(ResourceRequest.class);
+ resourceRequest.setHostName("*");
+ resourceRequest.setNumContainers(numRequestAdditional);
+ resourceRequest.setPriority(priority);
+ resourceRequest.setCapability(resource);
+
+ AllocateRequest request = Records.newRecord(AllocateRequest.class);
+ request.setResponseId(responseId);
+ request.setApplicationAttemptId(attemptId);
+ request.addAsk(resourceRequest);
+ request.addAllReleases(destroyedReleasedIds);
+ request.addAllReleases(unneededReleasedIds);
+
+ responseId++;
+
+ AllocateResponse allocateResponse = null;
+ try {
+ allocateResponse = protocol.allocate(request);
+ } catch (YarnRemoteException e) {
+ // ignore
+ log.error("Error allocating containers", e);
+ return;
+ }
+
+ AMResponse response = allocateResponse.getAMResponse();
+
+ // remove unassigned container about to be freed
+ for (ContainerId id : unneededReleasedIds) {
+ log.info(String.format("Unassigned container '%s' about to be freed, removing", id));
+ unassignedContainers.remove(id);
+ }
+
+ // newly added containers
+ for (Container container : response.getAllocatedContainers()) {
+ unassignedContainers.put(container.getId(), container);
+ }
+
+ log.info(String.format("%d new containers available, %d required", unassignedContainers.size(), numMetaAcquire));
+
+ Iterator<Container> itYarn = unassignedContainers.values().iterator();
+ Iterator<YarnContainerData> itMeta = yarndata.iterator();
+ while (itYarn.hasNext() && itMeta.hasNext()) {
+ YarnContainerData meta = itMeta.next();
+
+ if (meta.yarnId >= 0)
+ continue;
+
+ Container containerYarn = itYarn.next();
+
+ log.debug(String.format("assigning yarn container '%s' to container node '%s'", containerYarn.getId(), meta.id));
+
+ String command = String.format(YARN_CONTAINER_COMMAND, YarnUtils.YARN_CONTAINER_PATH, ApplicationConstants.LOG_DIR_EXPANSION_VAR,
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+
+ log.debug(String.format("Running container command \"%s\"", command));
+
+ // configuration
+ YarnContainerProcessProperties containerProp = meta.getProperties();
+ containerProp.setProperty(YarnContainerProcessProperties.ADDRESS, properties.getAddress());
+ containerProp.setProperty(YarnContainerProcessProperties.CLUSTER, properties.getCluster());
+ containerProp.setProperty(YarnContainerProcessProperties.YARNDATA, properties.getYarnData());
+ containerProp.setProperty(YarnContainerProcessProperties.NAME, meta.id);
+
+ File propertiesFile = YarnUtils.writePropertiesToTemp(containerProp);
+
+ // HDFS
+ final String namespace = attemptId.getApplicationId().toString() + "/" + meta.id;
+ final Path containerArchive = YarnUtils.copyToHdfs(YarnUtils.YARN_CONTAINER_STAGING, YarnUtils.YARN_CONTAINER_STAGING, namespace, yarnConfig);
+ final Path containerProperties = YarnUtils.copyToHdfs(propertiesFile.getCanonicalPath(), YarnUtils.YARN_CONTAINER_PROPERTIES, namespace, yarnConfig);
+
+ // local resources
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+ localResources.put(YarnUtils.YARN_CONTAINER_DESTINATION,
+ YarnUtils.createHdfsResource(containerArchive, LocalResourceType.ARCHIVE, yarnConfig));
+ localResources.put(YarnUtils.YARN_CONTAINER_PROPERTIES,
+ YarnUtils.createHdfsResource(containerProperties, LocalResourceType.FILE, yarnConfig));
+
+ ContainerLaunchContext context = Records.newRecord(ContainerLaunchContext.class);
+ context.setContainerId(containerYarn.getId());
+ context.setResource(containerYarn.getResource());
+ context.setEnvironment(Maps.<String, String> newHashMap());
+ context.setCommands(Collections.singletonList(command));
+ context.setLocalResources(localResources);
+ context.setUser(properties.getUser());
+
+ log.debug(String.format("container '%s' executing command '%s'", meta.id, command));
+
+ StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
+ startReq.setContainerLaunchContext(context);
+
+ try {
+ getContainerManager(containerYarn).startContainer(startReq);
+
+ } catch (YarnRemoteException e) {
+ log.error(String.format("Error starting container '%s'", meta.id), e);
+ return;
+ }
+
+ log.debug(String.format("container '%s' started, updating container node", meta.id));
+
+ meta.setProperties(containerProp);
+ meta.setState(ContainerState.CONNECTING);
+ meta.setYarnId(containerYarn.getId().getId());
+ yarnDataService.update(meta);
+
+ yarn2meta.put(containerYarn.getId(), meta.id);
+
+ log.debug(String.format("removing '%s' from unassigned yarn containers and adding to active list", containerYarn.getId()));
+
+ itYarn.remove();
+ activeContainers.put(containerYarn.getId(), containerYarn);
+
+ // cleanup
+ propertiesFile.deleteOnExit();
+
+ }
+
+ for (ContainerStatus status : response.getCompletedContainersStatuses()) {
+ ContainerId id = status.getContainerId();
+
+ log.info(String.format("Container '%s' completed", id));
+
+ if (unassignedContainers.containsKey(id)) {
+ log.info(String.format("Unassigned container '%s' terminated, removing", id));
+ unassignedContainers.remove(id);
+ }
+
+ if (activeContainers.containsKey(id)) {
+ log.info(String.format("Active container '%s' terminated, removing", id));
+ activeContainers.remove(id);
+
+ String metaId = yarn2meta.get(id);
+ YarnContainerData meta = yarnDataService.read(metaId);
+
+ log.debug(String.format("container '%s' finalized, updating container node", meta.id));
+
+ yarnDataService.update(meta.setState(ContainerState.FINALIZE));
+ }
+
+ completedContainers.put(id, status);
+ }
+
+ log.debug("yarn service update cycle complete");
+
+ } catch (Exception e) {
+ log.error("Error while executing yarn update cycle", e);
+ }
+ }
+
+ private List<ContainerId> createOverstockReleaseList(int numYarnRequired) {
+ List<ContainerId> unneededReleasedIds = new ArrayList<ContainerId>();
+ Iterator<Container> itUnassigned = unassignedContainers.values().iterator();
+ if (numYarnRequired < 0) {
+ for (int i = 0; i < -numYarnRequired && itUnassigned.hasNext(); i++) {
+ Container container = itUnassigned.next();
+ unneededReleasedIds.add(container.getId());
+ log.debug(String.format("Container '%s' no longer required, removing", container.getId()));
+ itUnassigned.remove();
+ }
+ }
+ return unneededReleasedIds;
+ }
+
+ private List<ContainerId> createDestroyedReleaseList(Collection<YarnContainerData> yarndata) {
+ List<ContainerId> releasedIds = new ArrayList<ContainerId>();
+ for (YarnContainerData meta : yarndata) {
+ if (meta.state == ContainerState.HALTED) {
+ ContainerId containerId = Records.newRecord(ContainerId.class);
+ containerId.setApplicationAttemptId(attemptId);
+ containerId.setId(meta.yarnId);
+ releasedIds.add(containerId);
+ log.debug(String.format("releasing container '%s'", containerId));
+ }
+ }
+ return releasedIds;
+ }
+
+ private int countAcquireMeta(Collection<YarnContainerData> yarndata) {
+ int numMetaAcquire = 0;
+ for (YarnContainerData meta : yarndata) {
+ if (meta.state == ContainerState.ACQUIRE) {
+ numMetaAcquire++;
+ }
+ }
+ return numMetaAcquire;
+ }
+
+ private int countActiveMeta(Collection<YarnContainerData> yarndata) {
+ int numMetaActive = 0;
+ for (YarnContainerData meta : yarndata) {
+ if (meta.state != ContainerState.ACQUIRE && meta.state != ContainerState.HALTED && meta.state != ContainerState.FINALIZE) {
+ numMetaActive++;
+ }
+ }
+ return numMetaActive;
+ }
+ }
+
+ private ContainerManager getContainerManager(Container container) {
+ YarnConfiguration yarnConf = new YarnConfiguration(yarnConfig);
+ YarnRPC rpc = YarnRPC.create(yarnConf);
+ NodeId nodeId = container.getNodeId();
+ String containerIpPort = String.format("%s:%d", nodeId.getHost(), nodeId.getPort());
+ log.info("Connecting to ContainerManager at: " + containerIpPort);
+ InetSocketAddress addr = NetUtils.createSocketAddr(containerIpPort);
+ ContainerManager cm = (ContainerManager) rpc.getProxy(ContainerManager.class, addr, yarnConfig);
+ return cm;
+ }
+
+ public static void destroyLocalMasterNamespace() {
+ log.info("cleaning up master directory");
+ FileUtils.deleteQuietly(new File(YarnUtils.YARN_MASTER_DESTINATION));
+ FileUtils.deleteQuietly(new File(YarnUtils.YARN_MASTER_PROPERTIES));
+ FileUtils.deleteQuietly(new File(YarnUtils.YARN_CONTAINER_STAGING));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnStatusProvider.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnStatusProvider.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnStatusProvider.java
new file mode 100644
index 0000000..6ec4710
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnStatusProvider.java
@@ -0,0 +1,67 @@
+package org.apache.helix.autoscale.impl.yarn;
+
+import java.util.Properties;
+
+import org.apache.helix.autoscale.StatusProviderService;
+import org.apache.helix.autoscale.impl.yarn.YarnContainerData.ContainerState;
+import org.apache.log4j.Logger;
+
+/**
+ * StatusProvider for YARN-based containers spawned via
+ * {@link YarnContainerProvider}. Reads {@link YarnDataProvider} meta data.
+ * Runnable and configurable service.
+ *
+ */
+public class YarnStatusProvider implements StatusProviderService {
+
+ static final Logger log = Logger.getLogger(YarnStatusProvider.class);
+
+ String yarndata;
+
+ ZookeeperYarnDataProvider yarnDataService;
+
+ public YarnStatusProvider() {
+ // left blank
+ }
+
+ public YarnStatusProvider(String yarndata) {
+ this.yarndata = yarndata;
+ this.yarnDataService = new ZookeeperYarnDataProvider(yarndata);
+ }
+
+ @Override
+ public void configure(Properties properties) throws Exception {
+ this.yarndata = properties.getProperty("yarndata");
+ this.yarnDataService = new ZookeeperYarnDataProvider(yarndata);
+ }
+
+ @Override
+ public void start() throws Exception {
+ yarnDataService = new ZookeeperYarnDataProvider(yarndata);
+ yarnDataService.start();
+ }
+
+ @Override
+ public void stop() throws Exception {
+ if (yarnDataService != null) {
+ yarnDataService.stop();
+ yarnDataService = null;
+ }
+ }
+
+ @Override
+ public boolean exists(String id) {
+ return yarnDataService.exists(id);
+ }
+
+ @Override
+ public boolean isHealthy(String id) {
+ try {
+ return yarnDataService.read(id).state == ContainerState.ACTIVE;
+ } catch (Exception e) {
+ log.warn(String.format("Could not get activity data of %s", id));
+ return false;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnUtils.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnUtils.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnUtils.java
new file mode 100644
index 0000000..1051696
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnUtils.java
@@ -0,0 +1,174 @@
+package org.apache.helix.autoscale.impl.yarn;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Writer;
+import java.nio.charset.Charset;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.helix.autoscale.impl.yarn.YarnContainerData.ContainerState;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.io.Files;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonToken;
+import com.google.gson.stream.JsonWriter;
+
+/**
+ * Utility for writing property files, transferring data via HDFS and
+ * serializing {@link YarnContainerData} for zookeeper.
+ *
+ */
+class YarnUtils {
+
+ static final Logger log = Logger.getLogger(YarnUtils.class);
+
+ static final String YARN_MASTER_ARCHIVE_PATH = "target/metamanager-assembly.tar.gz";
+ static final String YARN_MASTER_PATH = "master/metamanager/bin/yarn-master-process.sh";
+ static final String YARN_MASTER_STAGING = "master.tar.gz";
+ static final String YARN_MASTER_DESTINATION = "master";
+ static final String YARN_MASTER_PROPERTIES = "master.properties";
+ static final String YARN_CONTAINER_ARCHIVE_PATH = "target/metamanager-assembly.tar.gz";
+ static final String YARN_CONTAINER_STAGING = "container.tar.gz";
+ static final String YARN_CONTAINER_PATH = "container/metamanager/bin/yarn-container-process.sh";
+ static final String YARN_CONTAINER_DESTINATION = "container";
+ static final String YARN_CONTAINER_PROPERTIES = "container.properties";
+
+ static Gson gson;
+ static {
+ GsonBuilder builder = new GsonBuilder();
+ builder.registerTypeAdapter(ContainerState.class, new ContainerStateAdapter());
+ builder.setPrettyPrinting();
+ gson = builder.create();
+ }
+
+ public static String toJson(YarnContainerData meta) {
+ return gson.toJson(meta);
+ }
+
+ public static YarnContainerData fromJson(String json) {
+ return gson.fromJson(json, YarnContainerData.class);
+ }
+
+ public static Properties getPropertiesFromPath(String path) throws IOException {
+ Properties properties = new Properties();
+ properties.load(new InputStreamReader(new FileInputStream(path)));
+ return properties;
+ }
+
+ public static File writePropertiesToTemp(Properties properties) throws IOException {
+ File tmpFile = File.createTempFile("provider", ".properties");
+ Writer writer = Files.newWriter(tmpFile, Charset.defaultCharset());
+ properties.store(writer, null);
+ writer.flush();
+ writer.close();
+ return tmpFile;
+ }
+
+ public static Path copyToHdfs(String source, String dest, String namespace, Configuration conf) throws IOException {
+ Path sourcePath = makeQualified(source);
+ Path destPath = makeQualified(conf.get(FileSystem.FS_DEFAULT_NAME_KEY) + "/" + namespace + "/" + dest);
+ log.debug(String.format("Copying '%s' to '%s'", sourcePath, destPath));
+
+ FileSystem fs = FileSystem.get(conf);
+ fs.copyFromLocalFile(false, true, sourcePath, destPath);
+ fs.close();
+ return destPath;
+ }
+
+ public static void destroyHdfsNamespace(String namespace, Configuration conf) throws IOException {
+ Path path = makeQualified(conf.get(FileSystem.FS_DEFAULT_NAME_KEY) + "/" + namespace);
+ log.debug(String.format("Deleting '%s'", path));
+
+ FileSystem fs = FileSystem.get(conf);
+ fs.delete(path, true);
+ fs.close();
+ }
+
+ public static LocalResource createHdfsResource(Path path, LocalResourceType type, Configuration conf) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+
+ URL url = ConverterUtils.getYarnUrlFromPath(path);
+
+ FileStatus status = fs.getFileStatus(path);
+
+ LocalResource resource = Records.newRecord(LocalResource.class);
+ resource.setResource(url);
+ resource.setSize(status.getLen());
+ resource.setTimestamp(status.getModificationTime());
+ resource.setType(type);
+ resource.setVisibility(LocalResourceVisibility.APPLICATION);
+
+ fs.close();
+
+ return resource;
+ }
+
+ static Path makeQualified(String path) throws UnsupportedFileSystemException {
+ return FileContext.getFileContext().makeQualified(new Path(path));
+ }
+
+ static class ContainerStateAdapter extends TypeAdapter<ContainerState> {
+ @Override
+ public ContainerState read(JsonReader reader) throws IOException {
+ if (reader.peek() == JsonToken.NULL) {
+ reader.nextNull();
+ return null;
+ }
+ return ContainerState.valueOf(reader.nextString());
+ }
+
+ @Override
+ public void write(JsonWriter writer, ContainerState value) throws IOException {
+ if (value == null) {
+ writer.nullValue();
+ return;
+ }
+ writer.value(value.name());
+ }
+ }
+
+ static YarnContainerProcessProperties createContainerProcessProperties(Properties properties) {
+ Preconditions.checkNotNull(properties);
+ YarnContainerProcessProperties yarnProp = new YarnContainerProcessProperties();
+ yarnProp.putAll(properties);
+ return yarnProp;
+ }
+
+ static YarnContainerProviderProperties createContainerProviderProperties(Properties properties) {
+ Preconditions.checkNotNull(properties);
+ YarnContainerProviderProperties yarnProp = new YarnContainerProviderProperties();
+ yarnProp.putAll(properties);
+ return yarnProp;
+ }
+
+ static YarnMasterProperties createMasterProperties(Properties properties) {
+ Preconditions.checkNotNull(properties);
+ YarnMasterProperties yarnProp = new YarnMasterProperties();
+ yarnProp.putAll(properties);
+ return yarnProp;
+ }
+
+ private YarnUtils() {
+ // left blank
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/ZookeeperYarnDataProvider.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/ZookeeperYarnDataProvider.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/ZookeeperYarnDataProvider.java
new file mode 100644
index 0000000..32f8c79
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/ZookeeperYarnDataProvider.java
@@ -0,0 +1,100 @@
+package org.apache.helix.autoscale.impl.yarn;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Properties;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.autoscale.Service;
+import org.apache.log4j.Logger;
+
+/**
+ * Configurable and runnable service for {@link YarnDataProvider} based on
+ * zookeeper.
+ *
+ */
+public class ZookeeperYarnDataProvider implements YarnDataProvider, Service {
+
+ static final Logger log = Logger.getLogger(ZookeeperYarnDataProvider.class);
+
+ static final String CONTAINER_NAMESPACE = "containers";
+
+ static final String BASE_PATH = "/" + CONTAINER_NAMESPACE;
+
+ static final int META_TIMEOUT = 5000;
+ static final long POLL_INTERVAL = 100;
+
+ String yarndata;
+
+ ZkClient client;
+
+ public ZookeeperYarnDataProvider() {
+ // left blank
+ }
+
+ public ZookeeperYarnDataProvider(String yarndataAddress) {
+ this.yarndata = yarndataAddress;
+ }
+
+ @Override
+ public void configure(Properties properties) throws Exception {
+ this.yarndata = properties.getProperty("yarndata");
+ }
+
+ @Override
+ public void start() {
+ log.debug(String.format("starting yarndata service for '%s'", yarndata));
+
+ client = new ZkClient(yarndata, META_TIMEOUT, META_TIMEOUT);
+
+ client.createPersistent(BASE_PATH, true);
+ }
+
+ @Override
+ public void stop() {
+ log.debug(String.format("stopping yarndata service for '%s'", yarndata));
+ if (client != null) {
+ client.close();
+ client = null;
+ }
+ }
+
+ @Override
+ public boolean exists(String id) {
+ return client.exists(makePath(id));
+ }
+
+ @Override
+ public void create(YarnContainerData meta) throws Exception {
+ client.createEphemeral(makePath(meta.id), YarnUtils.toJson(meta));
+ }
+
+ @Override
+ public YarnContainerData read(String id) throws Exception {
+ return YarnUtils.fromJson(client.<String> readData(makePath(id)));
+ }
+
+ @Override
+ public Collection<YarnContainerData> readAll() throws Exception {
+ Collection<YarnContainerData> yarndata = new ArrayList<YarnContainerData>();
+ for (String id : client.getChildren(BASE_PATH)) {
+ yarndata.add(YarnUtils.fromJson(client.<String> readData(makePath(id))));
+ }
+ return yarndata;
+ }
+
+ @Override
+ public void update(YarnContainerData meta) throws Exception {
+ client.writeData(makePath(meta.id), YarnUtils.toJson(meta));
+ }
+
+ @Override
+ public void delete(String id) throws Exception {
+ client.delete(makePath(id));
+ }
+
+ String makePath(String containerId) {
+ return BASE_PATH + "/" + containerId;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderProcess.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderProcess.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderProcess.java
new file mode 100644
index 0000000..2fe3166
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderProcess.java
@@ -0,0 +1,82 @@
+package org.apache.helix.autoscale.provider;
+
+import java.util.Properties;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.autoscale.ClusterAdmin;
+import org.apache.helix.autoscale.ContainerProvider;
+import org.apache.helix.autoscale.HelixClusterAdmin;
+import org.apache.helix.autoscale.Service;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Helix participant for ContainerProvider. Configurable via ProviderProperties
+ * and runnable service.
+ *
+ */
+public class ProviderProcess implements Service {
+ static final Logger log = Logger.getLogger(ProviderProcess.class);
+
+ ClusterAdmin admin;
+
+ ProviderProperties properties;
+ ContainerProvider provider;
+ HelixAdmin helixAdmin;
+ HelixManager participantManager;
+
+ @Override
+ public void configure(Properties properties) throws Exception {
+ Preconditions.checkNotNull(properties);
+ ProviderProperties providerProperties = new ProviderProperties();
+ providerProperties.putAll(properties);
+ Preconditions.checkArgument(providerProperties.isValid());
+
+ this.properties = providerProperties;
+
+ }
+
+ public void setConteinerProvider(ContainerProvider provider) {
+ this.provider = provider;
+ }
+
+ @Override
+ public void start() throws Exception {
+ Preconditions.checkNotNull(provider);
+
+ log.info(String.format("Registering provider '%s' at '%s/%s'", properties.getName(), properties.getMetaAddress(), properties.getMetaCluster()));
+ HelixAdmin metaHelixAdmin = new ZKHelixAdmin(properties.getMetaAddress());
+ metaHelixAdmin.addInstance(properties.getMetaCluster(), new InstanceConfig(properties.getName()));
+ metaHelixAdmin.close();
+
+ log.info(String.format("Starting provider '%s'", properties.getName()));
+ helixAdmin = new ZKHelixAdmin(properties.getAddress());
+ admin = new HelixClusterAdmin(properties.getCluster(), helixAdmin);
+
+ participantManager = HelixManagerFactory.getZKHelixManager(properties.getMetaCluster(), properties.getName(), InstanceType.PARTICIPANT,
+ properties.getMetaAddress());
+ participantManager.getStateMachineEngine().registerStateModelFactory("OnlineOffline", new ProviderStateModelFactory(provider, admin));
+ participantManager.connect();
+
+ log.info(String.format("Successfully started provider '%s'", properties.getName()));
+ }
+
+ @Override
+ public void stop() {
+ log.info(String.format("Stopping provider '%s'", properties.getName()));
+ if (participantManager != null) {
+ participantManager.disconnect();
+ participantManager = null;
+ }
+ if (helixAdmin != null) {
+ helixAdmin.close();
+ helixAdmin = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderProperties.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderProperties.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderProperties.java
new file mode 100644
index 0000000..eef9fad
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderProperties.java
@@ -0,0 +1,97 @@
+package org.apache.helix.autoscale.provider;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.helix.autoscale.bootstrapper.BootUtils;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Base configuration for {@link ProviderProcess}.
+ *
+ */
+public class ProviderProperties extends Properties {
+ /**
+ *
+ */
+ private static final long serialVersionUID = -2209509977839674160L;
+
+ public final static String ADDRESS = "address";
+ public final static String CLUSTER = "cluster";
+ public final static String METAADDRESS = "metaaddress";
+ public final static String METACLUSTER = "metacluster";
+ public final static String NAME = "name";
+
+ public final static String CONTAINER_NAMESPACE = "containers";
+
+ public boolean isValid() {
+ return(containsKey(ADDRESS) &&
+ containsKey(CLUSTER) &&
+ containsKey(METAADDRESS) &&
+ containsKey(METACLUSTER) &&
+ containsKey(NAME));
+ }
+
+ public String getAddress() {
+ return getProperty(ADDRESS);
+ }
+
+ public String getCluster() {
+ return getProperty(CLUSTER);
+ }
+
+ public String getMetaAddress() {
+ return getProperty(METAADDRESS);
+ }
+
+ public String getMetaCluster() {
+ return getProperty(METACLUSTER);
+ }
+
+ public String getName() {
+ return getProperty(NAME);
+ }
+
+ public Set<String> getContainers() {
+ if(!BootUtils.hasNamespace(this, CONTAINER_NAMESPACE))
+ return Collections.emptySet();
+ return BootUtils.getNamespaces(BootUtils.getNamespace(this, CONTAINER_NAMESPACE));
+ }
+
+ public boolean hasContainer(String id) {
+ if(!BootUtils.hasNamespace(this, CONTAINER_NAMESPACE)) return false;
+ if(!BootUtils.hasNamespace(BootUtils.getNamespace(this, CONTAINER_NAMESPACE), id)) return false;
+ return true;
+ }
+
+ public Properties getContainer(String id) {
+ Preconditions.checkArgument(BootUtils.hasNamespace(this, CONTAINER_NAMESPACE), "no container namespace");
+ Preconditions.checkArgument(BootUtils.hasNamespace(BootUtils.getNamespace(this, CONTAINER_NAMESPACE), id), "container %s not configured", id);
+ return BootUtils.getNamespace(BootUtils.getNamespace(this, CONTAINER_NAMESPACE), id);
+ }
+
+ public void addContainer(String id, Properties properties) {
+ Preconditions.checkArgument(!getContainers().contains(id), "Already contains container type %s", id);
+
+ // add container config
+ for(Map.Entry<Object, Object> entry : properties.entrySet()) {
+ this.put(CONTAINER_NAMESPACE + "." + id + "." + entry.getKey(), entry.getValue());
+ }
+ }
+
+ @Override
+ public Object get(Object key) {
+ Preconditions.checkState(containsKey(key));
+ return super.get(key);
+ }
+
+ @Override
+ public String getProperty(String key) {
+ Preconditions.checkState(containsKey(key));
+ return super.getProperty(key);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderRebalancer.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderRebalancer.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderRebalancer.java
new file mode 100644
index 0000000..2b6e428
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderRebalancer.java
@@ -0,0 +1,352 @@
+package org.apache.helix.autoscale.provider;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.autoscale.StatusProvider;
+import org.apache.helix.autoscale.TargetProvider;
+import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
+/**
+ * Rebalancer for meta cluster. Polls {@link TargetProvider} and
+ * {@link StatusProvider} and reads and sets IdealState of meta cluster participants (
+ * {@link ProviderProcess}). The number of active container is set to the target
+ * count. Failed containers are shut down and restarted on any available
+ * provider. Also, container counts are balanced across multiple providers.<br/>
+ * <b>NOTE:</b> status and target provider are injected via
+ * {@link ProviderRebalancerSingleton}<br/>
+ * <br/>
+ * <b>IdealState mapping:</b><br/>
+ * resource = container type<br/>
+ * partition = logical container instance<br/>
+ * instance = container provider<br/>
+ * status = physical container instance presence<br/>
+ */
+public class ProviderRebalancer implements Rebalancer {
+
+ static final Logger log = Logger.getLogger(ProviderRebalancer.class);
+
+ static final long UPDATE_INTERVAL_MIN = 1500;
+
+ static final Object lock = new Object();
+ static long nextUpdate = 0;
+
+ TargetProvider targetProvider;
+ StatusProvider statusProvider;
+ HelixManager manager;
+
+ @Override
+ public void init(HelixManager manager) {
+ this.targetProvider = ProviderRebalancerSingleton.getTargetProvider();
+ this.statusProvider = ProviderRebalancerSingleton.getStatusProvider();
+ this.manager = manager;
+ }
+
+ @Override
+ public ResourceAssignment computeResourceMapping(Resource resource, IdealState idealState, CurrentStateOutput currentStateOutput,
+ ClusterDataCache clusterData) {
+
+ final String resourceName = resource.getResourceName();
+ final String containerType = resourceName;
+
+ final SortedSet<String> allContainers = Sets.newTreeSet(new IndexedNameComparator());
+ allContainers.addAll(idealState.getPartitionSet());
+
+ final SortedSet<String> allProviders = Sets.newTreeSet(new IndexedNameComparator());
+ for (LiveInstance instance : clusterData.getLiveInstances().values()) {
+ allProviders.add(instance.getId());
+ }
+
+ final ResourceState currentState = new ResourceState(resourceName, currentStateOutput);
+
+ // target container count
+ log.debug(String.format("Retrieving target container count for type '%s'", containerType));
+ int targetCount = -1;
+ try {
+ targetCount = targetProvider.getTargetContainerCount(containerType);
+ } catch (Exception e) {
+ log.error(String.format("Could not retrieve target count for '%s'", containerType), e);
+ return new ResourceAssignment(resourceName);
+ }
+
+ // provider sanity check
+ if (allProviders.isEmpty()) {
+ log.warn(String.format("Could not find any providers"));
+ return new ResourceAssignment(resourceName);
+ }
+
+ // all containers
+ SortedSet<String> assignedContainers = getAssignedContainers(currentState, allContainers);
+ SortedSet<String> failedContainers = getFailedContainers(currentState, allContainers);
+
+ log.info(String.format("Rebalancing '%s' (target=%d, active=%d, failures=%d)", resourceName, targetCount, assignedContainers.size(),
+ failedContainers.size()));
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("%s: assigned containers %s", resourceName, assignedContainers));
+ log.debug(String.format("%s: failed containers %s", resourceName, failedContainers));
+ }
+
+ // assignment
+ int maxCountPerProvider = (int) Math.ceil(targetCount / (float) allProviders.size());
+
+ ResourceAssignment assignment = new ResourceAssignment(resourceName);
+ CountMap counts = new CountMap(allProviders);
+ int assignmentCount = 0;
+
+ // currently assigned
+ for (String containerName : assignedContainers) {
+ String providerName = getProvider(currentState, containerName);
+ Partition partition = new Partition(containerName);
+
+ if (failedContainers.contains(containerName)) {
+ log.warn(String.format("Container '%s:%s' failed, going offline", providerName, containerName));
+ assignment.addReplicaMap(partition, Collections.singletonMap(providerName, "OFFLINE"));
+
+ } else if (counts.get(providerName) >= maxCountPerProvider) {
+ log.warn(String.format("Container '%s:%s' misassigned, going offline", providerName, containerName));
+ assignment.addReplicaMap(partition, Collections.singletonMap(providerName, "OFFLINE"));
+
+ } else {
+ assignment.addReplicaMap(partition, Collections.singletonMap(providerName, "ONLINE"));
+ }
+
+ counts.increment(providerName);
+ assignmentCount++;
+ }
+
+ // currently unassigned
+ SortedSet<String> unassignedContainers = Sets.newTreeSet(new IndexedNameComparator());
+ unassignedContainers.addAll(allContainers);
+ unassignedContainers.removeAll(assignedContainers);
+
+ for (String containerName : unassignedContainers) {
+ if (assignmentCount >= targetCount)
+ break;
+
+ String providerName = counts.getMinKey();
+ Partition partition = new Partition(containerName);
+
+ if (failedContainers.contains(containerName)) {
+ log.warn(String.format("Container '%s:%s' failed and unassigned, going offline", providerName, containerName));
+ assignment.addReplicaMap(partition, Collections.singletonMap(providerName, "OFFLINE"));
+
+ } else {
+ assignment.addReplicaMap(partition, Collections.singletonMap(providerName, "ONLINE"));
+ }
+
+ counts.increment(providerName);
+ assignmentCount++;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("assignment counts: %s", counts));
+ log.debug(String.format("assignment: %s", assignment));
+ }
+
+ return assignment;
+ }
+
+ boolean hasProvider(ResourceState state, String containerName) {
+ Map<String, String> currentStateMap = state.getCurrentStateMap(containerName);
+ Map<String, String> pendingStateMap = state.getPendingStateMap(containerName);
+ return hasInstance(currentStateMap, "ONLINE") || hasInstance(pendingStateMap, "ONLINE");
+ }
+
+ String getProvider(ResourceState state, String containerName) {
+ Map<String, String> currentStateMap = state.getCurrentStateMap(containerName);
+ if (hasInstance(currentStateMap, "ONLINE"))
+ return getInstance(currentStateMap, "ONLINE");
+
+ Map<String, String> pendingStateMap = state.getPendingStateMap(containerName);
+ return getInstance(pendingStateMap, "ONLINE");
+ }
+
+ SortedSet<String> getFailedContainers(ResourceState state, Collection<String> containers) {
+ SortedSet<String> failedContainers = Sets.newTreeSet(new IndexedNameComparator());
+ for (String containerName : containers) {
+ Map<String, String> currentStateMap = state.getCurrentStateMap(containerName);
+ Map<String, String> pendingStateMap = state.getPendingStateMap(containerName);
+
+ if (hasInstance(currentStateMap, "ERROR")) {
+ failedContainers.add(containerName);
+ continue;
+ }
+
+ if (!hasInstance(currentStateMap, "ONLINE") || hasInstance(pendingStateMap, "OFFLINE"))
+ continue;
+
+ // container listed online and not in transition, but not active
+ if (!statusProvider.isHealthy(containerName)) {
+ log.warn(String.format("Container '%s' designated ONLINE, but is not active", containerName));
+ failedContainers.add(containerName);
+ }
+ }
+ return failedContainers;
+ }
+
+ SortedSet<String> getAssignedContainers(ResourceState state, Collection<String> containers) {
+ SortedSet<String> assignedContainers = Sets.newTreeSet(new IndexedNameComparator());
+ for (String containerName : containers) {
+
+ if (!hasProvider(state, containerName))
+ continue;
+
+ assignedContainers.add(containerName);
+ }
+ return assignedContainers;
+ }
+
+ boolean hasInstance(Map<String, String> stateMap, String state) {
+ if (!stateMap.isEmpty()) {
+ for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+ if (entry.getValue().equals(state)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ String getInstance(Map<String, String> stateMap, String state) {
+ if (!stateMap.isEmpty()) {
+ for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+ if (entry.getValue().equals(state)) {
+ return entry.getKey();
+ }
+ }
+ }
+ throw new IllegalArgumentException(String.format("Could not find instance with state '%s'", state));
+ }
+
+ class IndexedNameComparator implements Comparator<String> {
+ Pattern pattern = Pattern.compile("^(.*)([0-9]+)$");
+
+ @Override
+ public int compare(String o1, String o2) {
+ Matcher m1 = pattern.matcher(o1);
+ Matcher m2 = pattern.matcher(o2);
+
+ boolean find1 = m1.find();
+ boolean find2 = m2.find();
+
+ if (!find1 && !find2)
+ return o1.compareTo(o2);
+
+ if (!find1 && find2)
+ return -1;
+
+ if (find1 && !find2)
+ return 1;
+
+ String name1 = m1.group(1);
+ String name2 = m2.group(1);
+
+ int name_comp = name1.compareTo(name2);
+ if (name_comp != 0)
+ return name_comp;
+
+ int index1 = Integer.valueOf(m1.group(2));
+ int index2 = Integer.valueOf(m2.group(2));
+
+ return (int) Math.signum(index1 - index2);
+ }
+ }
+
+ class CountMap extends HashMap<String, Integer> {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 3954138748385337978L;
+
+ public CountMap(Collection<String> keys) {
+ super();
+ for (String key : keys) {
+ put(key, 0);
+ }
+ }
+
+ @Override
+ public Integer get(Object key) {
+ Preconditions.checkArgument(containsKey(key), "Key %s not found", key);
+ return super.get(key);
+ }
+
+ public int increment(String key) {
+ int newValue = get(key) + 1;
+ Preconditions.checkArgument(containsKey(key), "Key %s not found", key);
+ put(key, newValue);
+ return newValue;
+ }
+
+ public String getMinKey() {
+ Preconditions.checkState(size() > 0, "Must contain at least one item");
+
+ String minKey = null;
+ int minValue = Integer.MAX_VALUE;
+
+ for (String key : keySet()) {
+ int value = get(key);
+ if (value < minValue) {
+ minValue = value;
+ minKey = key;
+ }
+ }
+
+ return minKey;
+ }
+
+ public String getMaxKey() {
+ Preconditions.checkState(size() > 0, "Must contain at least one item");
+
+ String maxKey = null;
+ int maxValue = Integer.MIN_VALUE;
+
+ for (String key : keySet()) {
+ int value = get(key);
+ if (value > maxValue) {
+ maxValue = value;
+ maxKey = key;
+ }
+ }
+
+ return maxKey;
+ }
+ }
+
+ class ResourceState {
+ final String resourceName;
+ final CurrentStateOutput state;
+
+ public ResourceState(String resourceName, CurrentStateOutput state) {
+ this.resourceName = resourceName;
+ this.state = state;
+ }
+
+ Map<String, String> getCurrentStateMap(String partitionName) {
+ return state.getCurrentStateMap(resourceName, new Partition(partitionName));
+ }
+
+ Map<String, String> getPendingStateMap(String partitionName) {
+ return state.getPendingStateMap(resourceName, new Partition(partitionName));
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderRebalancerSingleton.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderRebalancerSingleton.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderRebalancerSingleton.java
new file mode 100644
index 0000000..16b8829
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderRebalancerSingleton.java
@@ -0,0 +1,38 @@
+package org.apache.helix.autoscale.provider;
+
+import org.apache.helix.autoscale.StatusProvider;
+import org.apache.helix.autoscale.TargetProvider;
+import org.apache.log4j.Logger;
+
+/**
+ * Utility for dependency injection into ProviderRebalancer.
+ *
+ */
+public class ProviderRebalancerSingleton {
+
+ static final Logger log = Logger.getLogger(ProviderRebalancerSingleton.class);
+
+ static TargetProvider targetProvider;
+ static StatusProvider statusProvider;
+
+ private ProviderRebalancerSingleton() {
+ // left blank
+ }
+
+ public static TargetProvider getTargetProvider() {
+ return targetProvider;
+ }
+
+ public static void setTargetProvider(TargetProvider targetProvider) {
+ ProviderRebalancerSingleton.targetProvider = targetProvider;
+ }
+
+ public static StatusProvider getStatusProvider() {
+ return statusProvider;
+ }
+
+ public static void setStatusProvider(StatusProvider statusProvider) {
+ ProviderRebalancerSingleton.statusProvider = statusProvider;
+ }
+
+}