You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/09/20 20:30:21 UTC
[12/15] Adding Helix-task-framework and Yarn integration modules
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/bootstrapper/ZookeeperService.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/bootstrapper/ZookeeperService.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/bootstrapper/ZookeeperService.java
new file mode 100644
index 0000000..8d78f9b
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/bootstrapper/ZookeeperService.java
@@ -0,0 +1,64 @@
+package org.apache.helix.autoscale.bootstrapper;
+
+import java.io.File;
+import java.util.Properties;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.autoscale.Service;
+import org.apache.log4j.Logger;
+
+/**
+ * Bootstrapping zookeeper. Convenience tool for creating standalone zookeeper
+ * instance for test deployments. For production use a separate zookeeper
+ * cluster is strongly recommended.
+ *
+ */
+public class ZookeeperService implements Service {
+
+ static final Logger log = Logger.getLogger(ZookeeperService.class);
+
+ String dataDir;
+ String logDir;
+ int port;
+
+ ZkServer server;
+
+ @Override
+ public void configure(Properties properties) throws Exception {
+ dataDir = properties.getProperty("datadir", "/tmp/zk/data");
+ logDir = properties.getProperty("logdir", "/tmp/zk/log");
+ port = Integer.parseInt(properties.getProperty("port", "2199"));
+ }
+
+ @Override
+ public void start() {
+ log.info(String.format("starting zookeeper service (dataDir='%s', logDir='%s', port=%d)", dataDir, logDir, port));
+
+ FileUtils.deleteQuietly(new File(dataDir));
+ FileUtils.deleteQuietly(new File(logDir));
+
+ IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
+ @Override
+ public void createDefaultNameSpace(ZkClient zkClient) {
+ // left blank
+ }
+ };
+
+ server = new ZkServer(dataDir, logDir, defaultNameSpace, port);
+ server.start();
+ }
+
+ @Override
+ public void stop() {
+ log.info("stopping zookeeper service");
+
+ if (server != null) {
+ server.shutdown();
+ server = null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/container/ContainerProcess.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/container/ContainerProcess.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/container/ContainerProcess.java
new file mode 100644
index 0000000..343e426
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/container/ContainerProcess.java
@@ -0,0 +1,133 @@
+package org.apache.helix.autoscale.container;
+
+import java.util.Properties;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.autoscale.Service;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Base service for spawn-able container types. Configure from Properties and
+ * instantiates Helix participant to managed cluster.
+ *
+ */
+public abstract class ContainerProcess implements Service {
+ static final Logger log = Logger.getLogger(ContainerProcess.class);
+
+ private ContainerProcessProperties properties;
+ private HelixManager participantManager;
+
+ private String modelName;
+ private StateModelFactory<? extends StateModel> modelFactory;
+
+ private String instanceName;
+ private String clusterName;
+ private String zookeeperAddress;
+
+ private boolean active = false;
+ private boolean failed = false;
+
+ public final void setModelName(String modelName) {
+ this.modelName = modelName;
+ }
+
+ public final void setModelFactory(StateModelFactory<? extends StateModel> modelFactory) {
+ this.modelFactory = modelFactory;
+ }
+
+ @Override
+ public void configure(Properties properties) throws Exception {
+ ContainerProcessProperties containerProps = new ContainerProcessProperties();
+ containerProps.putAll(properties);
+ Preconditions.checkArgument(containerProps.isValid());
+
+ this.properties = containerProps;
+ this.instanceName = containerProps.getName();
+ this.clusterName = containerProps.getCluster();
+ this.zookeeperAddress = containerProps.getAddress();
+ }
+
+ @Override
+ public final void start() {
+ try {
+ Preconditions.checkNotNull(modelName, "state model name not set");
+ Preconditions.checkNotNull(modelFactory, "state model factory not set");
+ Preconditions.checkState(properties.isValid(), "process properties not valid: %s", properties.toString());
+
+ log.info(String.format("starting container '%s'", instanceName));
+ startContainer();
+
+ log.info(String.format("starting helix participant '%s'", instanceName));
+ startParticipant();
+
+ active = true;
+
+ } catch (Exception e) {
+ log.error(String.format("starting container '%s' failed", instanceName), e);
+ fail();
+ }
+ }
+
+ protected abstract void startContainer() throws Exception;
+
+ private final void startParticipant() throws Exception {
+ participantManager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, zookeeperAddress);
+ participantManager.getStateMachineEngine().registerStateModelFactory(modelName, modelFactory);
+ participantManager.connect();
+ }
+
+ @Override
+ public final void stop() {
+ try {
+ log.info(String.format("stopping helix participant '%s'", instanceName));
+ stopParticipant();
+
+ log.info(String.format("stopping container '%s'", instanceName));
+ stopContainer();
+
+ active = false;
+
+ } catch (Exception e) {
+ log.warn(String.format("stopping container '%s' failed", instanceName), e);
+ }
+ }
+
+ protected abstract void stopContainer() throws Exception;
+
+ private final void stopParticipant() {
+ if (participantManager != null) {
+ participantManager.disconnect();
+ }
+ }
+
+ public final void fail() {
+ failed = true;
+ }
+
+ public final boolean isActive() {
+ return active && !failed;
+ }
+
+ public final boolean isFailed() {
+ return failed;
+ }
+
+ public final ContainerProcessProperties getProperties() {
+ return properties;
+ }
+
+ String getModelName() {
+ return modelName;
+ }
+
+ StateModelFactory<? extends StateModel> getModelFactory() {
+ return modelFactory;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/container/ContainerProcessProperties.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/container/ContainerProcessProperties.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/container/ContainerProcessProperties.java
new file mode 100644
index 0000000..1096174
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/container/ContainerProcessProperties.java
@@ -0,0 +1,66 @@
+package org.apache.helix.autoscale.container;
+
+import java.util.Properties;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Base configuration for ContainerProcess.
+ *
+ */
+public class ContainerProcessProperties extends Properties {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 5754863079470995536L;
+
+ public static final String CLUSTER = "cluster";
+ public static final String ADDRESS = "address";
+ public static final String NAME = "name";
+ public static final String CONTAINER_CLASS = "class";
+
+ public ContainerProcessProperties() {
+ // left blank
+ }
+
+ public ContainerProcessProperties(Properties properties) {
+ Preconditions.checkNotNull(properties);
+ putAll(properties);
+ }
+
+ public boolean isValid() {
+ return containsKey(CLUSTER) &&
+ containsKey(NAME) &&
+ containsKey(ADDRESS) &&
+ containsKey(CONTAINER_CLASS);
+ }
+
+ public String getCluster() {
+ return getProperty(CLUSTER);
+ }
+
+ public String getAddress() {
+ return getProperty(ADDRESS);
+ }
+
+ public String getName() {
+ return getProperty(NAME);
+ }
+
+ public String getContainerClass() {
+ return getProperty(CONTAINER_CLASS);
+ }
+
+ @Override
+ public synchronized Object get(Object key) {
+ Preconditions.checkState(containsKey(key));
+ return super.get(key);
+ }
+
+ @Override
+ public String getProperty(String key) {
+ Preconditions.checkState(containsKey(key));
+ return super.getProperty(key);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/container/ContainerUtils.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/container/ContainerUtils.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/container/ContainerUtils.java
new file mode 100644
index 0000000..8bab01e
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/container/ContainerUtils.java
@@ -0,0 +1,46 @@
+package org.apache.helix.autoscale.container;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.reflect.Constructor;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Utility for loading ContainerProperties and spawning ContainerProcess.
+ *
+ */
+public class ContainerUtils {
+
+ static final Logger log = Logger.getLogger(ContainerUtils.class);
+
+ private ContainerUtils() {
+ // left blank
+ }
+
+ public static ContainerProcess createProcess(ContainerProcessProperties properties) throws Exception {
+ String containerClassName = properties.getContainerClass();
+
+ Class<?> containerClass = Class.forName(containerClassName);
+
+ log.debug(String.format("checking for properties constructor in class '%s'", containerClassName));
+
+ Constructor<?> constructor = containerClass.getConstructor(ContainerProcessProperties.class);
+
+ return (ContainerProcess) constructor.newInstance(properties);
+ }
+
+ public static ContainerProcessProperties getPropertiesFromResource(String resourceName) throws IOException {
+ ContainerProcessProperties properties = new ContainerProcessProperties();
+ properties.load(ClassLoader.getSystemResourceAsStream(resourceName));
+ return properties;
+ }
+
+ public static ContainerProcessProperties getPropertiesFromPath(String filePath) throws IOException {
+ ContainerProcessProperties properties = new ContainerProcessProperties();
+ properties.load(new InputStreamReader(new FileInputStream(filePath)));
+ return properties;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/FileTargetProvider.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/FileTargetProvider.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/FileTargetProvider.java
new file mode 100644
index 0000000..ebbf4b6
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/FileTargetProvider.java
@@ -0,0 +1,51 @@
+package org.apache.helix.autoscale.impl;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.helix.autoscale.TargetProviderService;
+
+/**
+ * File-based target model. Container count is extracted from properties file. Count may change dynamically.
+ *
+ */
+public class FileTargetProvider implements TargetProviderService {
+
+ File file;
+
+ public FileTargetProvider() {
+ // left blank
+ }
+
+ public FileTargetProvider(String path) {
+ this.file = new File(path);
+ }
+
+ @Override
+ public int getTargetContainerCount(String containerType) throws FileNotFoundException, IOException, IllegalArgumentException {
+ Properties properties = new Properties();
+ properties.load(new FileReader(file));
+ if (!properties.contains(containerType))
+ throw new IllegalArgumentException(String.format("container type '%s' not found in '%s'", containerType, file.getCanonicalPath()));
+ return Integer.parseInt((String) properties.get(containerType));
+ }
+
+ @Override
+ public void configure(Properties properties) throws Exception {
+ this.file = new File(properties.getProperty("path"));
+ }
+
+ @Override
+ public void start() throws Exception {
+ // left blank
+ }
+
+ @Override
+ public void stop() throws Exception {
+ // left blank
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/RedisTargetProvider.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/RedisTargetProvider.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/RedisTargetProvider.java
new file mode 100644
index 0000000..723ac4d
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/RedisTargetProvider.java
@@ -0,0 +1,356 @@
+package org.apache.helix.autoscale.impl;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.autoscale.TargetProviderService;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Redis-specific target model based on recurring Tps benchmarking. Tps target
+ * and probed redis-server instances are configured via zookeeper. Tps target
+ * may change dynamically.
+ *
+ */
+public class RedisTargetProvider implements TargetProviderService {
+
+ static final Logger log = Logger.getLogger(RedisTargetProvider.class);
+
+ public static final String BENCHMARK_COMMAND = "redis-benchmark";
+ public static final String BENCHMARK_TESTS = "GET,SET";
+
+ public static final String DEFAULT_RECORDS = "100000";
+ public static final String DEFAULT_CLIENTS = "20";
+ public static final String DEFAULT_REQUESTS = "100000";
+ public static final String DEFAULT_TIMEOUT = "8000";
+ public static final String DEFAULT_INTERVAL = "10000";
+ public static final String DEFAULT_ALPHA = "0.25";
+
+ ZkClient zookeeper;
+
+ String address;
+ String root;
+
+ int records;
+ int clients;
+ int requests;
+ int timeout;
+ int interval;
+
+ int targetTpsGet;
+ int targetTpsSet;
+
+ int targetCountMin;
+ int targetCountMax;
+ int targetCount;
+
+ double alpha;
+ double averageTpsGet;
+ double averageTpsSet;
+ double averageCount;
+
+ ScheduledExecutorService executor;
+
+ @Override
+ public void configure(Properties properties) {
+ address = properties.getProperty("address");
+ root = properties.getProperty("root");
+ targetTpsGet = Integer.valueOf(properties.getProperty("get", "0"));
+ targetTpsSet = Integer.valueOf(properties.getProperty("set", "0"));
+ targetCountMin = Integer.valueOf(properties.getProperty("min", "-1"));
+ targetCountMax = Integer.valueOf(properties.getProperty("max", "-1"));
+ records = Integer.valueOf(properties.getProperty("records", DEFAULT_RECORDS));
+ clients = Integer.valueOf(properties.getProperty("clients", DEFAULT_CLIENTS));
+ requests = Integer.valueOf(properties.getProperty("requests", DEFAULT_REQUESTS));
+ timeout = Integer.valueOf(properties.getProperty("timeout", DEFAULT_TIMEOUT));
+ interval = Integer.valueOf(properties.getProperty("interval", DEFAULT_INTERVAL));
+ alpha = Double.valueOf(properties.getProperty("alpha", DEFAULT_ALPHA));
+ }
+
+ @Override
+ public void start() {
+ log.debug("starting redis status service");
+ zookeeper = new ZkClient(address);
+ zookeeper.createPersistent("/" + root, true);
+
+ try { zookeeper.createPersistent("/" + root + "/target.get", String.valueOf(targetTpsGet)); } catch (Exception ignore) {}
+ try { zookeeper.createPersistent("/" + root + "/target.set", String.valueOf(targetTpsSet)); } catch (Exception ignore) {}
+ try { zookeeper.createPersistent("/" + root + "/target.min", String.valueOf(targetCountMin)); } catch (Exception ignore) {}
+ try { zookeeper.createPersistent("/" + root + "/target.max", String.valueOf(targetCountMax)); } catch (Exception ignore) {}
+
+ executor = Executors.newSingleThreadScheduledExecutor();
+ executor.scheduleAtFixedRate(new RedisBenchmarkRunnable(), 0, interval, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void stop() {
+ log.debug("stopping redis status service");
+ if (executor != null) {
+ executor.shutdownNow();
+ while (!executor.isTerminated()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ executor = null;
+ }
+ if (zookeeper != null) {
+ zookeeper.close();
+ zookeeper = null;
+ }
+ }
+
+ @Override
+ public int getTargetContainerCount(String containerType) throws Exception {
+ return targetCount;
+ }
+
+ private class RedisBenchmarkRunnable implements Runnable {
+ ExecutorService executor = Executors.newCachedThreadPool();
+ RedisResult aggregateResult;
+
+ @Override
+ public void run() {
+ log.debug("running redis benchmark");
+
+ aggregateResult = new RedisResult(0);
+ Collection<Future<RedisResult>> futures = new ArrayList<Future<RedisResult>>();
+
+ try {
+ Collection<RedisTarget> targets = getTargets();
+
+ // start benchmark
+ for (RedisTarget target : targets) {
+ log.debug(String.format("submitting target '%s'", target));
+ Future<RedisResult> future = executor.submit(new RedisCallable(target));
+ futures.add(future);
+ }
+
+ // aggregate results
+ try {
+ log.debug("waiting for results");
+
+ long limit = System.currentTimeMillis() + timeout;
+ for (Future<RedisResult> future : futures) {
+ try {
+ RedisResult result = future.get(limit - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ log.debug(String.format("got result '%s'", result));
+ aggregate(result);
+ } catch (Exception e) {
+ log.warn(String.format("failed to get result"));
+ future.cancel(true);
+ }
+ }
+ } catch (Exception e) {
+ log.error("Error running redis benchmark", e);
+
+ for (Future<RedisResult> future : futures) {
+ future.cancel(true);
+ }
+
+ return;
+ }
+
+ // compare to thresholds
+ log.debug(String.format("aggregate result is '%s'", aggregateResult));
+
+ // get target from zookeeper
+ try { targetTpsGet = Integer.valueOf(zookeeper.<String> readData("/" + root + "/target.get")); } catch (Exception ignore) {}
+ try { targetTpsSet = Integer.valueOf(zookeeper.<String> readData("/" + root + "/target.set")); } catch (Exception ignore) {}
+ try { targetCountMin = Integer.valueOf(zookeeper.<String> readData("/" + root + "/target.min")); } catch (Exception ignore) {}
+ try { targetCountMax = Integer.valueOf(zookeeper.<String> readData("/" + root + "/target.max")); } catch (Exception ignore) {}
+
+ averageCount = alpha * aggregateResult.serverCount + (1.0 - alpha) * averageCount;
+
+ // calculate counts
+ int targetCountGet = -1;
+ if (aggregateResult.containsKey("GET")) {
+ double tpsTarget = targetTpsGet;
+ double tps = aggregateResult.get("GET");
+
+ averageTpsGet = alpha * tps + (1.0 - alpha) * averageTpsGet;
+
+ targetCountGet = (int) Math.ceil(tpsTarget / averageTpsGet * averageCount);
+ log.debug(String.format("count.get=%d, target.get=%f, tps.get=%f, tps.avg.get=%f, count.avg=%f", targetCountGet, tpsTarget, tps,
+ averageTpsGet, averageCount));
+ }
+
+ int targetCountSet = -1;
+ if (aggregateResult.containsKey("SET")) {
+ double tpsTarget = targetTpsSet;
+ double tps = aggregateResult.get("SET");
+
+ averageTpsSet = alpha * tps + (1.0 - alpha) * averageTpsSet;
+
+ targetCountSet = (int) Math.ceil(tpsTarget / averageTpsSet * averageCount);
+ log.debug(String.format("count.set=%d, target.set=%f, tps.set=%f, tps.avg.set=%f, count.avg=%f", targetCountSet, tpsTarget, tps,
+ averageTpsSet, averageCount));
+ }
+
+ targetCount = Math.max(targetCountGet, targetCountSet);
+
+ if (targetCountMin > 0)
+ targetCount = Math.max(targetCount, targetCountMin);
+ if (targetCountMax > 0)
+ targetCount = Math.min(targetCount, targetCountMax);
+
+ targetCount = Math.max(targetCount, 1);
+
+ log.debug(String.format("target count is %d", targetCount));
+ RedisTargetProvider.this.targetCount = targetCount;
+
+ } catch (Exception e) {
+ log.error("Error running redis benchmark", e);
+
+ for (Future<RedisResult> future : futures) {
+ future.cancel(true);
+ }
+ }
+
+ }
+
+ Collection<RedisTarget> getTargets() {
+ log.debug("fetching redis servers from zookeeper");
+ Collection<RedisTarget> targets = new ArrayList<RedisTarget>();
+ Collection<String> servers = zookeeper.getChildren("/" + root);
+
+ servers.remove("target.get");
+ servers.remove("target.set");
+ servers.remove("target.min");
+ servers.remove("target.max");
+
+ for (String server : servers) {
+ if (!zookeeper.exists("/" + root + "/" + server + "/heartbeat"))
+ continue;
+
+ String hostname = zookeeper.readData("/" + root + "/" + server + "/hostname");
+ int port = Integer.valueOf(zookeeper.<String> readData("/" + root + "/" + server + "/port"));
+
+ targets.add(new RedisTarget(hostname, port));
+ }
+
+ log.debug(String.format("found %d servers: %s", targets.size(), targets));
+ return targets;
+ }
+
+ void aggregate(RedisResult result) {
+ RedisResult newResult = new RedisResult(aggregateResult.serverCount + result.serverCount);
+
+ for (Entry<String, Double> entry : result.entrySet()) {
+ double current = 0.0d;
+ if (aggregateResult.containsKey(entry.getKey()))
+ current = aggregateResult.get(entry.getKey());
+
+ current += entry.getValue();
+ newResult.put(entry.getKey(), current);
+ }
+
+ aggregateResult = newResult;
+ }
+ }
+
+ private static class RedisTarget {
+ final String hostname;
+ final int port;
+
+ public RedisTarget(String hostname, int port) {
+ this.hostname = hostname;
+ this.port = port;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s:%d", hostname, port);
+ }
+ }
+
+ private static class RedisResult extends HashMap<String, Double> {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 4599748807597500952L;
+
+ final int serverCount;
+
+ public RedisResult(int serverCount) {
+ this.serverCount = serverCount;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("[serverCount=%d %s]", serverCount, super.toString());
+ }
+ }
+
+ private class RedisCallable implements Callable<RedisResult> {
+ final RedisTarget target;
+
+ public RedisCallable(RedisTarget target) {
+ this.target = target;
+ }
+
+ @Override
+ public RedisResult call() throws Exception {
+ log.debug(String.format("executing benchmark for '%s'", target));
+
+ ProcessBuilder builder = new ProcessBuilder();
+ builder.command(BENCHMARK_COMMAND, "-h", target.hostname, "-p", String.valueOf(target.port), "-r", String.valueOf(records), "-n",
+ String.valueOf(requests), "-c", String.valueOf(clients), "-t", BENCHMARK_TESTS, "--csv");
+ Process process = builder.start();
+
+ log.debug(String.format("running '%s'", builder.command()));
+
+ RedisResult result = new RedisResult(1);
+
+ int retVal;
+ try {
+ retVal = process.waitFor();
+ } catch (InterruptedException e) {
+ process.destroy();
+ return result;
+ }
+
+ Preconditions.checkState(retVal == 0, "Benchmark process returned %s", retVal);
+
+ Pattern pattern = Pattern.compile("\"([A-Z0-9_]+).*\",\"([0-9\\.]+)\"");
+
+ log.debug("parsing output");
+ BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ Matcher matcher = pattern.matcher(line);
+
+ if (!matcher.find())
+ continue;
+
+ String key = matcher.group(1);
+ Double value = Double.valueOf(matcher.group(2));
+
+ result.put(key, value);
+ }
+
+ log.debug(String.format("benchmark for '%s' returned '%s'", target, result));
+
+ return result;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/StaticTargetProvider.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/StaticTargetProvider.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/StaticTargetProvider.java
new file mode 100644
index 0000000..346f0fe
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/StaticTargetProvider.java
@@ -0,0 +1,62 @@
+package org.apache.helix.autoscale.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.helix.autoscale.TargetProviderService;
+import org.apache.log4j.Logger;
+
+/**
+ * Target model based on manually set count. Count may change dynamically.
+ *
+ */
+public class StaticTargetProvider implements TargetProviderService {
+ static final Logger log = Logger.getLogger(StaticTargetProvider.class);
+
+ final Map<String, Integer> targetCounts = new HashMap<String, Integer>();
+
+ public StaticTargetProvider() {
+ // left blank
+ }
+
+ public StaticTargetProvider(Map<String, Integer> targetCounts) {
+ this.targetCounts.putAll(targetCounts);
+ }
+
+ @Override
+ public int getTargetContainerCount(String containerType) {
+ return targetCounts.get(containerType);
+ }
+
+ public void setTargetContainerCount(String containerType, int targetCount) {
+ targetCounts.put(containerType, targetCount);
+ }
+
+ @Override
+ public void configure(Properties properties) throws Exception {
+ for (Entry<Object, Object> entry : properties.entrySet()) {
+ String key = (String) entry.getKey();
+
+ try {
+ int value = Integer.valueOf((String) entry.getValue());
+ log.debug(String.format("Inserting value '%s = %d'", key, value));
+ targetCounts.put(key, value);
+ } catch (NumberFormatException e) {
+ log.warn(String.format("Skipping '%s', not an integer (value='%s')", key, (String) entry.getValue()));
+ }
+ }
+ }
+
+ @Override
+ public void start() throws Exception {
+ // left blank
+ }
+
+ @Override
+ public void stop() throws Exception {
+ // left blank
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/DummyMasterSlaveProcess.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/DummyMasterSlaveProcess.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/DummyMasterSlaveProcess.java
new file mode 100644
index 0000000..683249d
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/DummyMasterSlaveProcess.java
@@ -0,0 +1,76 @@
+package org.apache.helix.autoscale.impl.container;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.autoscale.container.ContainerProcess;
+import org.apache.helix.autoscale.container.ContainerProcessProperties;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+/**
+ * Sample implementation of container with Helix participant for MasterSlave
+ * state model. Print state transitions only.
+ *
+ */
+public class DummyMasterSlaveProcess extends ContainerProcess {
+
+ static final Logger log = Logger.getLogger(DummyMasterSlaveProcess.class);
+
+ public DummyMasterSlaveProcess(ContainerProcessProperties properties) throws Exception {
+ configure(properties);
+ setModelName("MasterSlave");
+ setModelFactory(new DummyMasterSlaveModelFactory());
+ }
+
+ @Override
+ protected void startContainer() throws Exception {
+ log.info("starting dummy process container");
+ }
+
+ @Override
+ protected void stopContainer() throws Exception {
+ log.info("stopping dummy process container");
+ }
+
+ public static class DummyMasterSlaveModelFactory extends StateModelFactory<DummyMasterSlaveStateModel> {
+ @Override
+ public DummyMasterSlaveStateModel createNewStateModel(String partitionName) {
+ return new DummyMasterSlaveStateModel();
+ }
+ }
+
+ @StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "SLAVE", "MASTER", "DROPPED" })
+ public static class DummyMasterSlaveStateModel extends StateModel {
+
+ static final Logger log = Logger.getLogger(DummyMasterSlaveStateModel.class);
+
+ @Transition(from = "OFFLINE", to = "SLAVE")
+ public void offlineToSlave(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from OFFLINE to SLAVE", context.getManager().getInstanceName()));
+ }
+
+ @Transition(from = "SLAVE", to = "OFFLINE")
+ public void slaveToOffline(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from SLAVE to OFFLINE", context.getManager().getInstanceName()));
+ }
+
+ @Transition(from = "SLAVE", to = "MASTER")
+ public void slaveToMaster(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from SLAVE to MASTER", context.getManager().getInstanceName()));
+ }
+
+ @Transition(from = "MASTER", to = "SLAVE")
+ public void masterToSlave(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from MASTER to SLAVE", context.getManager().getInstanceName()));
+ }
+
+ @Transition(from = "OFFLINE", to = "DROPPED")
+ public void offlineToDropped(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from OFFLINE to DROPPED", context.getManager().getInstanceName()));
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/DummyOnlineOfflineProcess.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/DummyOnlineOfflineProcess.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/DummyOnlineOfflineProcess.java
new file mode 100644
index 0000000..a0aad8e
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/DummyOnlineOfflineProcess.java
@@ -0,0 +1,66 @@
+package org.apache.helix.autoscale.impl.container;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.autoscale.container.ContainerProcess;
+import org.apache.helix.autoscale.container.ContainerProcessProperties;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+/**
+ * Sample implementation of container with Helix participant for OnlineOffline
+ * state model. Print state transitions only.
+ *
+ */
+public class DummyOnlineOfflineProcess extends ContainerProcess {
+
+ static final Logger log = Logger.getLogger(DummyOnlineOfflineProcess.class);
+
+ public DummyOnlineOfflineProcess(ContainerProcessProperties properties) throws Exception {
+ configure(properties);
+ setModelName("OnlineOffline");
+ setModelFactory(new DummyOnlineOfflineModelFactory());
+ }
+
+ @Override
+ protected void startContainer() throws Exception {
+ log.info("starting dummy online-offline process container");
+ }
+
+ @Override
+ protected void stopContainer() throws Exception {
+ log.info("stopping dummy online-offline process container");
+ }
+
+ public static class DummyOnlineOfflineModelFactory extends StateModelFactory<DummyOnlineOfflineStateModel> {
+ @Override
+ public DummyOnlineOfflineStateModel createNewStateModel(String partitionName) {
+ return new DummyOnlineOfflineStateModel();
+ }
+ }
+
+ @StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "ONLINE", "DROPPED" })
+ public static class DummyOnlineOfflineStateModel extends StateModel {
+
+ static final Logger log = Logger.getLogger(DummyOnlineOfflineStateModel.class);
+
+ @Transition(from = "OFFLINE", to = "ONLINE")
+ public void offlineToOnline(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from OFFLINE to ONLINE", context.getManager().getInstanceName()));
+ }
+
+ @Transition(from = "ONLINE", to = "OFFLINE")
+ public void onlineToOffline(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from ONLINE to OFFLINE", context.getManager().getInstanceName()));
+ }
+
+ @Transition(from = "OFFLINE", to = "DROPPED")
+ public void offlineToDropped(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from OFFLINE to DROPPED", context.getManager().getInstanceName()));
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/RedisServerProcess.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/RedisServerProcess.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/RedisServerProcess.java
new file mode 100644
index 0000000..5f6f745
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/RedisServerProcess.java
@@ -0,0 +1,140 @@
+package org.apache.helix.autoscale.impl.container;
+
+import java.net.InetAddress;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.autoscale.container.ContainerProcess;
+import org.apache.helix.autoscale.container.ContainerProcessProperties;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+/**
+ * Container implementation for redis-server. Uses OnlineOffline model, spawns
+ * Redis as Shell process and writes metadata to zookeeper.
+ *
+ */
+public class RedisServerProcess extends ContainerProcess {
+
+ static final Logger log = Logger.getLogger(RedisServerProcess.class);
+
+ public static final String REDIS_SERVER_COMMAND = "redis-server";
+
+ public static final long MONITOR_INTERVAL = 5000;
+
+ ZkClient zookeeper;
+
+ final String address;
+ final String root;
+ final String name;
+ final int basePort;
+
+ Process process;
+
+ ScheduledExecutorService executor;
+
+ public RedisServerProcess(ContainerProcessProperties properties) throws Exception {
+ configure(properties);
+ setModelName("OnlineOffline");
+ setModelFactory(new RedisServerModelFactory());
+
+ address = properties.getProperty("address");
+ root = properties.getProperty("root");
+ basePort = Integer.valueOf(properties.getProperty("baseport"));
+ name = properties.getProperty(ContainerProcessProperties.NAME);
+ }
+
+ @Override
+ protected void startContainer() throws Exception {
+ log.info(String.format("starting redis server container for instance '%s'", name));
+
+ String hostname = InetAddress.getLocalHost().getHostName();
+ int port = basePort + Integer.valueOf(name.split("_")[1]);
+
+ log.debug(String.format("Starting redis server at '%s:%d'", hostname, port));
+
+ ProcessBuilder builder = new ProcessBuilder();
+ builder.command(REDIS_SERVER_COMMAND, "--port", String.valueOf(port));
+ process = builder.start();
+
+ log.debug("Updating zookeeper");
+ zookeeper = new ZkClient(address);
+ zookeeper.deleteRecursive("/" + root + "/" + name);
+ zookeeper.createPersistent("/" + root + "/" + name, true);
+ zookeeper.createPersistent("/" + root + "/" + name + "/hostname", hostname);
+ zookeeper.createPersistent("/" + root + "/" + name + "/port", String.valueOf(port));
+ zookeeper.createEphemeral("/" + root + "/" + name + "/heartbeat");
+
+ log.debug("Starting process monitor");
+ executor = Executors.newSingleThreadScheduledExecutor();
+ executor.scheduleAtFixedRate(new ProcessMonitor(), 0, MONITOR_INTERVAL, TimeUnit.MILLISECONDS);
+
+ }
+
+ @Override
+ protected void stopContainer() throws Exception {
+ log.info("stopping redis server container");
+
+ log.debug("Stopping process monitor");
+ executor.shutdownNow();
+
+ log.debug("Updating zookeeper");
+ zookeeper.deleteRecursive("/" + root + "/" + name);
+ zookeeper.close();
+
+ log.debug("Stopping process");
+ process.destroy();
+ process.waitFor();
+ }
+
+ public class RedisServerModelFactory extends StateModelFactory<RedisServerModel> {
+ @Override
+ public RedisServerModel createNewStateModel(String partitionName) {
+ return new RedisServerModel();
+ }
+ }
+
+ @StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "ONLINE", "DROPPED" })
+ public class RedisServerModel extends StateModel {
+
+ @Transition(from = "OFFLINE", to = "ONLINE")
+ public void offlineToSlave(Message m, NotificationContext context) {
+ // left blank
+ log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+ }
+
+ @Transition(from = "ONLINE", to = "OFFLINE")
+ public void slaveToOffline(Message m, NotificationContext context) {
+ // left blank
+ log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+ }
+
+ @Transition(from = "OFFLINE", to = "DROPPED")
+ public void offlineToDropped(Message m, NotificationContext context) {
+ // left blank
+ log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+ }
+
+ }
+
+ private class ProcessMonitor implements Runnable {
+ @Override
+ public void run() {
+ try {
+ process.exitValue();
+ log.warn("detected process failure");
+ fail();
+ } catch (Exception e) {
+ // expected
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/ZookeeperMasterSlaveProcess.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/ZookeeperMasterSlaveProcess.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/ZookeeperMasterSlaveProcess.java
new file mode 100644
index 0000000..0c1b728
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/ZookeeperMasterSlaveProcess.java
@@ -0,0 +1,108 @@
+package org.apache.helix.autoscale.impl.container;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.autoscale.container.ContainerProcess;
+import org.apache.helix.autoscale.container.ContainerProcessProperties;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+/**
+ * Sample implementation of container with Helix participant for MasterSlave
+ * state model. Writes current state to separate zookeeper domain.
+ *
+ */
+public class ZookeeperMasterSlaveProcess extends ContainerProcess {
+
+ static final Logger log = Logger.getLogger(ZookeeperMasterSlaveProcess.class);
+
+ ZkClient zookeeper;
+
+ final String address;
+ final String root;
+ final String name;
+
+ public ZookeeperMasterSlaveProcess(ContainerProcessProperties properties) throws Exception {
+ configure(properties);
+ setModelName("MasterSlave");
+ setModelFactory(new ZookeeperMasterSlaveModelFactory());
+
+ address = properties.getProperty("address");
+ root = properties.getProperty("root");
+ name = properties.getProperty(ContainerProcessProperties.NAME);
+ }
+
+ @Override
+ protected void startContainer() throws Exception {
+ log.info("starting zookeeper process container");
+
+ zookeeper = new ZkClient(address);
+ zookeeper.createPersistent("/" + root + "/" + name, true);
+ }
+
+ @Override
+ protected void stopContainer() throws Exception {
+ log.info("stopping zookeeper process container");
+
+ zookeeper.close();
+ }
+
+ public class ZookeeperMasterSlaveModelFactory extends StateModelFactory<ZookeeperMasterSlaveModel> {
+ @Override
+ public ZookeeperMasterSlaveModel createNewStateModel(String partitionName) {
+ return new ZookeeperMasterSlaveModel();
+ }
+ }
+
+ @StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "SLAVE", "MASTER", "DROPPED" })
+ public class ZookeeperMasterSlaveModel extends StateModel {
+
+ @Transition(from = "OFFLINE", to = "SLAVE")
+ public void offlineToSlave(Message m, NotificationContext context) {
+ transition(m, context);
+ }
+
+ @Transition(from = "SLAVE", to = "OFFLINE")
+ public void slaveToOffline(Message m, NotificationContext context) {
+ transition(m, context);
+ }
+
+ @Transition(from = "SLAVE", to = "MASTER")
+ public void slaveToMaster(Message m, NotificationContext context) {
+ transition(m, context);
+ }
+
+ @Transition(from = "MASTER", to = "SLAVE")
+ public void masterToSlave(Message m, NotificationContext context) {
+ transition(m, context);
+ }
+
+ @Transition(from = "OFFLINE", to = "DROPPED")
+ public void offlineToDropped(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+
+ String resource = m.getResourceName();
+ String partition = m.getPartitionName();
+ String path = "/" + root + "/" + name + "/" + resource + "_" + partition;
+
+ zookeeper.delete(path);
+ }
+
+ public void transition(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+
+ String resource = m.getResourceName();
+ String partition = m.getPartitionName();
+ String path = "/" + root + "/" + name + "/" + resource + "_" + partition;
+
+ zookeeper.delete(path);
+ zookeeper.createEphemeral(path, m.getToState());
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalContainerProvider.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalContainerProvider.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalContainerProvider.java
new file mode 100644
index 0000000..7e5d553
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalContainerProvider.java
@@ -0,0 +1,119 @@
+package org.apache.helix.autoscale.impl.local;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.helix.autoscale.ContainerProvider;
+import org.apache.helix.autoscale.ContainerProviderService;
+import org.apache.helix.autoscale.container.ContainerProcess;
+import org.apache.helix.autoscale.container.ContainerProcessProperties;
+import org.apache.helix.autoscale.container.ContainerUtils;
+import org.apache.helix.autoscale.impl.local.LocalContainerSingleton.LocalProcess;
+import org.apache.helix.autoscale.provider.ProviderProperties;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * {@link ContainerProvider} spawning VM-local containers. Only works in single-VM
+ * deployments as container metadata is managed via singleton.
+ *
+ * @see LocalContainerSingleton
+ */
+class LocalContainerProvider implements ContainerProviderService {
+
+ static final Logger log = Logger.getLogger(LocalContainerProvider.class);
+
+ final Map<String, Properties> types = new HashMap<String, Properties>();
+
+ String address;
+ String cluster;
+ String name;
+
+ @Override
+ public void configure(Properties properties) throws Exception {
+ ProviderProperties providerProperties = new ProviderProperties();
+ providerProperties.putAll(properties);
+ Preconditions.checkArgument(providerProperties.isValid());
+
+ this.address = providerProperties.getProperty("address");
+ this.cluster = providerProperties.getProperty("cluster");
+ this.name = providerProperties.getProperty("name");
+
+ for (String containerType : providerProperties.getContainers()) {
+ registerType(containerType, providerProperties.getContainer(containerType));
+ }
+ }
+
+ @Override
+ public void start() throws Exception {
+ // left blank
+ }
+
+ @Override
+ public void stop() throws Exception {
+ destroyAll();
+ }
+
+ @Override
+ public void create(String id, String type) throws Exception {
+ Map<String, LocalProcess> processes = LocalContainerSingleton.getProcesses();
+
+ synchronized (processes) {
+ Preconditions.checkState(!processes.containsKey(id), "Process '%s' already exists", id);
+ Preconditions.checkState(types.containsKey(type), "Type '%s' is not registered", type);
+
+ ContainerProcessProperties properties = new ContainerProcessProperties(types.get(type));
+
+ properties.setProperty(ContainerProcessProperties.CLUSTER, cluster);
+ properties.setProperty(ContainerProcessProperties.NAME, id);
+ properties.setProperty(ContainerProcessProperties.ADDRESS, address);
+
+ log.info(String.format("Running container '%s' (properties='%s')", id, properties));
+
+ ContainerProcess process = ContainerUtils.createProcess(properties);
+ process.start();
+
+ processes.put(id, new LocalProcess(id, name, process));
+
+ }
+ }
+
+ @Override
+ public void destroy(String id) throws Exception {
+ Map<String, LocalProcess> processes = LocalContainerSingleton.getProcesses();
+
+ synchronized (processes) {
+ if (!processes.containsKey(id))
+ throw new IllegalArgumentException(String.format("Process '%s' does not exists", id));
+
+ log.info(String.format("Destroying container '%s'", id));
+
+ LocalProcess local = processes.remove(id);
+
+ local.process.stop();
+ }
+ }
+
+ @Override
+ public void destroyAll() {
+ Map<String, LocalProcess> processes = LocalContainerSingleton.getProcesses();
+
+ synchronized (processes) {
+ log.info("Destroying all processes");
+ for (LocalProcess local : new HashSet<LocalProcess>(processes.values())) {
+ if (local.owner.equals(name)) {
+ try { destroy(local.id); } catch (Exception ignore) {}
+ }
+ }
+ }
+ }
+
+ void registerType(String name, Properties properties) {
+ log.debug(String.format("Registering container type '%s' (properties='%s')", name, properties));
+ types.put(name, properties);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalContainerProviderProcess.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalContainerProviderProcess.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalContainerProviderProcess.java
new file mode 100644
index 0000000..ca1047c
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalContainerProviderProcess.java
@@ -0,0 +1,45 @@
+package org.apache.helix.autoscale.impl.local;
+
+import java.util.Properties;
+
+import org.apache.helix.autoscale.Service;
+import org.apache.helix.autoscale.provider.ProviderProcess;
+import org.apache.helix.autoscale.provider.ProviderProperties;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Configurable and runnable service for {@link LocalContainerProvider}.
+ *
+ */
+public class LocalContainerProviderProcess implements Service {
+ LocalContainerProvider provider;
+ ProviderProcess process;
+
+ @Override
+ public void configure(Properties properties) throws Exception {
+ ProviderProperties providerProperties = new ProviderProperties();
+ providerProperties.putAll(properties);
+
+ Preconditions.checkArgument(providerProperties.isValid(), "provider properties not valid (properties='%s')", properties);
+
+ provider = new LocalContainerProvider();
+ provider.configure(properties);
+
+ process = new ProviderProcess();
+ process.configure(providerProperties);
+ process.setConteinerProvider(provider);
+ }
+
+ @Override
+ public void start() throws Exception {
+ provider.start();
+ process.start();
+ }
+
+ @Override
+ public void stop() throws Exception {
+ process.stop();
+ provider.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalContainerSingleton.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalContainerSingleton.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalContainerSingleton.java
new file mode 100644
index 0000000..74f9279
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalContainerSingleton.java
@@ -0,0 +1,56 @@
+package org.apache.helix.autoscale.impl.local;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.autoscale.container.ContainerProcess;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Singleton tracking metadata for VM-local containers spawned via
+ * {@link LocalContainerProvider}.
+ *
+ */
+public class LocalContainerSingleton {
+ final static Map<String, LocalProcess> processes = new HashMap<String, LocalProcess>();
+
+ private LocalContainerSingleton() {
+ // left blank
+ }
+
+ public static Map<String, LocalProcess> getProcesses() {
+ return processes;
+ }
+
+ public static void reset() {
+ synchronized (processes) {
+ for (LocalProcess local : processes.values()) {
+ local.process.stop();
+ }
+ processes.clear();
+ }
+ }
+
+ public static void killProcess(String id) throws InterruptedException {
+ synchronized (processes) {
+ Preconditions.checkArgument(processes.containsKey(id), "Process '%s' does not exist", id);
+ ContainerProcess process = processes.get(id).process;
+ process.stop();
+ processes.remove(id);
+ }
+ }
+
+ static class LocalProcess {
+ final String id;
+ final String owner;
+ final ContainerProcess process;
+
+ public LocalProcess(String id, String owner, ContainerProcess process) {
+ this.id = id;
+ this.owner = owner;
+ this.process = process;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalStatusProvider.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalStatusProvider.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalStatusProvider.java
new file mode 100644
index 0000000..7cb02bb
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalStatusProvider.java
@@ -0,0 +1,53 @@
+package org.apache.helix.autoscale.impl.local;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.helix.autoscale.StatusProviderService;
+import org.apache.helix.autoscale.impl.local.LocalContainerSingleton.LocalProcess;
+
+/**
+ * StatusProvider for VM-local containers spawned via
+ * {@link LocalContainerProvider}. Runnable and configurable service.
+ *
+ */
+public class LocalStatusProvider implements StatusProviderService {
+
+ @Override
+ public boolean exists(String id) {
+ Map<String, LocalProcess> processes = LocalContainerSingleton.getProcesses();
+
+ synchronized (processes) {
+ return processes.containsKey(id);
+ }
+ }
+
+ @Override
+ public boolean isHealthy(String id) {
+ Map<String, LocalProcess> processes = LocalContainerSingleton.getProcesses();
+
+ synchronized (processes) {
+ LocalProcess local = processes.get(id);
+
+ if (local == null)
+ return false;
+
+ return local.process.isActive();
+ }
+ }
+
+ @Override
+ public void configure(Properties properties) throws Exception {
+ // left blank
+ }
+
+ @Override
+ public void start() throws Exception {
+ // left blank
+ }
+
+ @Override
+ public void stop() throws Exception {
+ // left blank
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerProcess.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerProcess.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerProcess.java
new file mode 100644
index 0000000..6847ac5
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerProcess.java
@@ -0,0 +1,93 @@
+package org.apache.helix.autoscale.impl.shell;
+
+import java.io.File;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.autoscale.container.ContainerProcess;
+import org.apache.helix.autoscale.container.ContainerProcessProperties;
+import org.apache.helix.autoscale.container.ContainerUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Host process for Shell-based container. ContainerProcess configuration is
+ * read from path in first command-line argument. Status is maintained using
+ * temporary marker file. (Program entry point)
+ *
+ */
+class ShellContainerProcess {
+ static final Logger log = Logger.getLogger(ShellContainerProcess.class);
+
+ public static final long MONITOR_INTERVAL = 5000;
+
+ static String markerDir;
+ static ContainerProcess process;
+ static ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+
+ public static void main(String[] args) throws Exception {
+ final String propertiesPath = args[0];
+ markerDir = args[1];
+
+ ContainerProcessProperties properties = ContainerUtils.getPropertiesFromPath(propertiesPath);
+
+ process = ContainerUtils.createProcess(properties);
+
+ log.debug("Installing shutdown hooks");
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ log.debug("Running shutdown hook");
+ try {
+ ShellContainerProcess.stop();
+ } catch (Exception ignore) {
+ }
+ }
+ });
+
+ log.debug("Launching shell container process");
+ process.start();
+
+ ShellUtils.createMarker(new File(markerDir));
+
+ log.debug("Launching process monitor");
+ executor.scheduleAtFixedRate(new ProcessMonitor(), 0, MONITOR_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+
+ static void stop() throws InterruptedException {
+ log.debug("Shutting down shell process");
+ if (process != null) {
+ process.stop();
+ ShellUtils.destroyMarker(new File(markerDir));
+ }
+ if (executor != null) {
+ executor.shutdownNow();
+ while (!executor.isTerminated()) {
+ Thread.sleep(100);
+ }
+ executor = null;
+ }
+ }
+
+ static class ProcessMonitor implements Runnable {
+ @Override
+ public void run() {
+ if (process.isFailed()) {
+ log.warn("detected process failure");
+ try {
+ ShellContainerProcess.stop();
+ } catch (Exception ignore) {
+ }
+ System.exit(1);
+ }
+ if (!process.isActive()) {
+ log.warn("detected process shutdown");
+ try {
+ ShellContainerProcess.stop();
+ } catch (Exception ignore) {
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerProvider.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerProvider.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerProvider.java
new file mode 100644
index 0000000..df4c6ef
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerProvider.java
@@ -0,0 +1,151 @@
+package org.apache.helix.autoscale.impl.shell;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.autoscale.ContainerProvider;
+import org.apache.helix.autoscale.ContainerProviderService;
+import org.apache.helix.autoscale.container.ContainerProcessProperties;
+import org.apache.helix.autoscale.impl.shell.ShellContainerSingleton.ShellProcess;
+import org.apache.helix.autoscale.provider.ProviderProperties;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.io.Files;
+
+/**
+ * {@link ContainerProvider} spawning shell-based containers. Only works in single-VM
+ * deployments as container metadata is managed via singleton.
+ *
+ * @see ShellContainerSingleton
+ */
+class ShellContainerProvider implements ContainerProviderService {
+
+ static final Logger log = Logger.getLogger(ShellContainerProvider.class);
+
+ static final String RUN_COMMAND = "/bin/sh";
+
+ static final long POLL_INTERVAL = 1000;
+ static final long CONTAINER_TIMEOUT = 60000;
+
+ // global view of processes required
+ static final Map<String, ShellProcess> processes = new HashMap<String, ShellProcess>();
+
+ final Map<String, Properties> types = new HashMap<String, Properties>();
+
+ String address;
+ String cluster;
+ String name;
+
+ @Override
+ public void configure(Properties properties) throws Exception {
+ Preconditions.checkNotNull(properties);
+ ProviderProperties providerProperties = new ProviderProperties();
+ providerProperties.putAll(properties);
+ Preconditions.checkArgument(providerProperties.isValid());
+
+ this.address = providerProperties.getProperty("address");
+ this.cluster = providerProperties.getProperty("cluster");
+ this.name = providerProperties.getProperty("name");
+
+ for (String containerType : providerProperties.getContainers()) {
+ registerType(containerType, providerProperties.getContainer(containerType));
+ }
+ }
+
+ @Override
+ public void start() throws Exception {
+ // left blank
+ }
+
+ @Override
+ public void stop() throws Exception {
+ destroyAll();
+ }
+
+ @Override
+ public void create(String id, String type) throws Exception {
+ Map<String, ShellProcess> processes = ShellContainerSingleton.getProcesses();
+
+ synchronized (processes) {
+ Preconditions.checkState(!processes.containsKey(id), "Process '%s' already exists", id);
+ Preconditions.checkState(types.containsKey(type), "Type '%s' is not registered", type);
+
+ ContainerProcessProperties properties = new ContainerProcessProperties(types.get(type));
+
+ properties.setProperty(ContainerProcessProperties.CLUSTER, cluster);
+ properties.setProperty(ContainerProcessProperties.NAME, id);
+ properties.setProperty(ContainerProcessProperties.ADDRESS, address);
+
+ File tmpDir = Files.createTempDir();
+ File tmpProperties = new File(tmpDir.getCanonicalPath() + File.separator + ShellUtils.SHELL_CONTAINER_PROPERTIES);
+ File tmpMarker = new File(tmpDir.getCanonicalPath());
+
+ properties.store(new FileWriter(tmpProperties), id);
+
+ log.info(String.format("Running container '%s' (properties='%s')", id, properties));
+
+ log.debug(String.format("Invoking command '%s %s %s %s'", RUN_COMMAND, ShellUtils.SHELL_CONTAINER_PATH, tmpProperties.getCanonicalPath(),
+ tmpMarker.getCanonicalPath()));
+
+ ProcessBuilder builder = new ProcessBuilder();
+ builder.command(RUN_COMMAND, ShellUtils.SHELL_CONTAINER_PATH, tmpProperties.getCanonicalPath(), tmpMarker.getCanonicalPath());
+
+ Process process = builder.start();
+
+ processes.put(id, new ShellProcess(id, name, process, tmpDir));
+
+ long limit = System.currentTimeMillis() + CONTAINER_TIMEOUT;
+ while (!ShellUtils.hasMarker(tmpDir)) {
+ if (System.currentTimeMillis() >= limit) {
+ throw new TimeoutException(String.format("Container '%s' failed to reach active state", id));
+ }
+ Thread.sleep(POLL_INTERVAL);
+ }
+ }
+ }
+
+ @Override
+ public void destroy(String id) throws Exception {
+ Map<String, ShellProcess> processes = ShellContainerSingleton.getProcesses();
+
+ synchronized (processes) {
+ if (!processes.containsKey(id))
+ throw new IllegalArgumentException(String.format("Process '%s' does not exists", id));
+
+ log.info(String.format("Destroying container '%s'", id));
+
+ ShellProcess shell = processes.remove(id);
+ shell.process.destroy();
+ shell.process.waitFor();
+
+ FileUtils.deleteDirectory(shell.tmpDir);
+ }
+ }
+
+ @Override
+ public void destroyAll() {
+ Map<String, ShellProcess> processes = ShellContainerSingleton.getProcesses();
+
+ synchronized (processes) {
+ log.info("Destroying all owned processes");
+ for (ShellProcess process : new HashSet<ShellProcess>(processes.values())) {
+ if (process.owner.equals(name)) {
+ try { destroy(process.id); } catch (Exception ignore) {}
+ }
+ }
+ }
+ }
+
+ void registerType(String name, Properties properties) {
+ log.debug(String.format("Registering container type '%s' (properties='%s')", name, properties));
+ types.put(name, properties);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerProviderProcess.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerProviderProcess.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerProviderProcess.java
new file mode 100644
index 0000000..1148b4e
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerProviderProcess.java
@@ -0,0 +1,45 @@
+package org.apache.helix.autoscale.impl.shell;
+
+import java.util.Properties;
+
+import org.apache.helix.autoscale.Service;
+import org.apache.helix.autoscale.provider.ProviderProcess;
+import org.apache.helix.autoscale.provider.ProviderProperties;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Configurable and runnable service for {@link ShellContainerProvider}.
+ *
+ */
+public class ShellContainerProviderProcess implements Service {
+ ShellContainerProvider provider;
+ ProviderProcess process;
+
+ @Override
+ public void configure(Properties properties) throws Exception {
+ ProviderProperties providerProperties = new ProviderProperties();
+ providerProperties.putAll(properties);
+
+ Preconditions.checkArgument(providerProperties.isValid(), "provider properties not valid (properties='%s')", properties);
+
+ provider = new ShellContainerProvider();
+ provider.configure(properties);
+
+ process = new ProviderProcess();
+ process.configure(providerProperties);
+ process.setConteinerProvider(provider);
+ }
+
+ @Override
+ public void start() throws Exception {
+ provider.start();
+ process.start();
+ }
+
+ @Override
+ public void stop() throws Exception {
+ process.stop();
+ provider.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerSingleton.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerSingleton.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerSingleton.java
new file mode 100644
index 0000000..a82baea
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerSingleton.java
@@ -0,0 +1,58 @@
+package org.apache.helix.autoscale.impl.shell;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Singleton tracking metadata for shell-based containers spawned via
+ * {@link ShellContainerProvider}.
+ *
+ */
+public class ShellContainerSingleton {
+ static final Map<String, ShellProcess> processes = new HashMap<String, ShellProcess>();
+
+ private ShellContainerSingleton() {
+ // left blank
+ }
+
+ public static Map<String, ShellProcess> getProcesses() {
+ return processes;
+ }
+
+ public static void reset() {
+ synchronized (processes) {
+ for (ShellProcess shell : processes.values()) {
+ shell.process.destroy();
+ try { shell.process.waitFor(); } catch(Exception ignore) {}
+ }
+ processes.clear();
+ }
+ }
+
+ public static void killProcess(String id) throws InterruptedException {
+ synchronized (processes) {
+ Preconditions.checkArgument(processes.containsKey(id), "Process '%s' does not exist", id);
+ Process process = processes.get(id).process;
+ process.destroy();
+ process.waitFor();
+ processes.remove(id);
+ }
+ }
+
+ static class ShellProcess {
+ final String id;
+ final String owner;
+ final Process process;
+ final File tmpDir;
+
+ public ShellProcess(String id, String owner, Process process, File tmpDir) {
+ this.id = id;
+ this.owner = owner;
+ this.process = process;
+ this.tmpDir = tmpDir;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellStatusProvider.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellStatusProvider.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellStatusProvider.java
new file mode 100644
index 0000000..8094050
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellStatusProvider.java
@@ -0,0 +1,64 @@
+package org.apache.helix.autoscale.impl.shell;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.helix.autoscale.StatusProviderService;
+import org.apache.helix.autoscale.impl.shell.ShellContainerSingleton.ShellProcess;
+
+/**
+ * StatusProvider for shell-based containers spawned via
+ * {@link ShellContainerProvider}. Runnable and configurable service.
+ *
+ */
+public class ShellStatusProvider implements StatusProviderService {
+
+ @Override
+ public boolean exists(String id) {
+ Map<String, ShellProcess> processes = ShellContainerSingleton.getProcesses();
+
+ synchronized (processes) {
+ return processes.containsKey(id);
+ }
+ }
+
+ @Override
+ public boolean isHealthy(String id) {
+ Map<String, ShellProcess> processes = ShellContainerSingleton.getProcesses();
+
+ synchronized (processes) {
+ ShellProcess shell = processes.get(id);
+
+ if (shell == null)
+ return false;
+
+ if (!ShellUtils.hasMarker(shell.tmpDir))
+ return false;
+
+ try {
+ // exit value
+ shell.process.exitValue();
+ return false;
+ } catch (IllegalThreadStateException e) {
+ // expected
+ }
+
+ return true;
+ }
+ }
+
+ @Override
+ public void configure(Properties properties) throws Exception {
+ // left blank
+ }
+
+ @Override
+ public void start() throws Exception {
+ // left blank
+ }
+
+ @Override
+ public void stop() throws Exception {
+ // left blank
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellUtils.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellUtils.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellUtils.java
new file mode 100644
index 0000000..02df0e0
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellUtils.java
@@ -0,0 +1,54 @@
+package org.apache.helix.autoscale.impl.shell;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Utility for creating and destroying temporary marker files for shell-based
+ * containers.
+ *
+ */
+class ShellUtils {
+
+ static final Logger log = Logger.getLogger(ShellUtils.class);
+
+ static final String SHELL_CONTAINER_PATH = "target/metamanager-pkg/bin/shell-container-process.sh";
+ static final String SHELL_CONTAINER_PROPERTIES = "container.properties";
+ static final String SHELL_CONTAINER_MARKER = "active";
+
+ private ShellUtils() {
+ // left blank
+ }
+
+ public static boolean hasMarker(File processDir) {
+ try {
+ log.debug(String.format("checking for marker file '%s'", getMarkerFile(processDir)));
+ if (getMarkerFile(processDir).exists())
+ return true;
+ } catch (IOException e) {
+ // ignore
+ }
+ return false;
+ }
+
+ public static void createMarker(File processDir) throws IOException {
+ log.debug(String.format("creating marker file '%s'", getMarkerFile(processDir)));
+ getMarkerFile(processDir).createNewFile();
+ }
+
+ public static void destroyMarker(File processDir) {
+ try {
+ log.debug(String.format("destroying marker file '%s'", getMarkerFile(processDir)));
+ getMarkerFile(processDir).delete();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+
+ public static File getMarkerFile(File processDir) throws IOException {
+ return new File(processDir.getCanonicalPath() + File.separatorChar + SHELL_CONTAINER_MARKER);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerData.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerData.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerData.java
new file mode 100644
index 0000000..4ebfb5d
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerData.java
@@ -0,0 +1,86 @@
+package org.apache.helix.autoscale.impl.yarn;
+
+/**
+ * Container meta data for YARN-based containers. Reflect lifecycle of container
+ * from requesting, to bootstrapping, active operation and shutdown. Read and
+ * written by {@link YarnMasterProcess}, {@link YarnContainerProvider} and
+ * {@link YarnContainerService}. Also read by {@link YarnStatusProvider}.
+ * Typically stored in zookeeper
+ *
+ */
+class YarnContainerData {
+
+ static enum ContainerState {
+ ACQUIRE,
+ CONNECTING,
+ ACTIVE,
+ TEARDOWN,
+ FAILED,
+ HALTED,
+ FINALIZE
+ }
+
+ String id;
+ ContainerState state;
+ int yarnId;
+ String owner;
+ YarnContainerProcessProperties properties;
+
+ public YarnContainerData() {
+ // left blank
+ }
+
+ public YarnContainerData(String id, String owner, YarnContainerProcessProperties properties) {
+ this.id = id;
+ this.state = ContainerState.ACQUIRE;
+ this.yarnId = -1;
+ this.owner = owner;
+ this.properties = properties;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public YarnContainerData setId(String id) {
+ this.id = id;
+ return this;
+ }
+
+ public ContainerState getState() {
+ return state;
+ }
+
+ public YarnContainerData setState(ContainerState state) {
+ this.state = state;
+ return this;
+ }
+
+ public int getYarnId() {
+ return yarnId;
+ }
+
+ public YarnContainerData setYarnId(int yarnId) {
+ this.yarnId = yarnId;
+ return this;
+ }
+
+ public String getOwner() {
+ return owner;
+ }
+
+ public YarnContainerData setOwner(String owner) {
+ this.owner = owner;
+ return this;
+ }
+
+ public YarnContainerProcessProperties getProperties() {
+ return properties;
+ }
+
+ public YarnContainerData setProperties(YarnContainerProcessProperties properties) {
+ this.properties = properties;
+ return this;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProcess.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProcess.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProcess.java
new file mode 100644
index 0000000..5f8d006
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProcess.java
@@ -0,0 +1,53 @@
+package org.apache.helix.autoscale.impl.yarn;
+
+import org.apache.helix.autoscale.container.ContainerProcess;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Host process for {@link ContainerProcess}es spawned by
+ * {@link YarnContainerProvider}. Configured via *.properties file in working
+ * directory. Corresponds to regular container in YARN and is managed jointly by
+ * the application master and the Helix participant. (Program entry point)
+ *
+ */
+class YarnContainerProcess {
+ static final Logger log = Logger.getLogger(YarnContainerProcess.class);
+
+ public static void main(String[] args) throws Exception {
+ log.trace("BEGIN YarnProcess.main()");
+
+ final YarnContainerProcessProperties properties = YarnUtils.createContainerProcessProperties(YarnUtils
+ .getPropertiesFromPath(YarnUtils.YARN_CONTAINER_PROPERTIES));
+ Preconditions.checkArgument(properties.isValid(), "container properties not valid: %s", properties.toString());
+
+ log.debug("Launching yarndata service");
+ final ZookeeperYarnDataProvider metaService = new ZookeeperYarnDataProvider(properties.getYarnData());
+ metaService.start();
+
+ log.debug("Launching yarn container service");
+ final YarnContainerService yarnService = new YarnContainerService();
+ yarnService.configure(properties);
+ yarnService.setYarnDataProvider(metaService);
+ yarnService.start();
+
+ log.debug("Installing shutdown hooks");
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ log.debug("Running shutdown hook");
+ yarnService.stop();
+ metaService.stop();
+ }
+ }));
+
+ System.out.println("Press ENTER to stop container process");
+ System.in.read();
+
+ log.debug("Stopping container services");
+ System.exit(0);
+
+ log.trace("END YarnProcess.main()");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProcessProperties.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProcessProperties.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProcessProperties.java
new file mode 100644
index 0000000..5ad8f63
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProcessProperties.java
@@ -0,0 +1,40 @@
+package org.apache.helix.autoscale.impl.yarn;
+
+import org.apache.helix.autoscale.container.ContainerProcessProperties;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Base configuration for {@link YarnContainerProcess}.
+ *
+ */
+public class YarnContainerProcessProperties extends ContainerProcessProperties {
+ /**
+ *
+ */
+ private static final long serialVersionUID = -2209509977839674160L;
+
+ public final static String YARNDATA = "yarndata";
+
+ public boolean isValid() {
+ return super.isValid() &&
+ containsKey(YARNDATA);
+ }
+
+ public String getYarnData() {
+ return getProperty(YARNDATA);
+ }
+
+ @Override
+ public Object get(Object key) {
+ Preconditions.checkState(containsKey(key));
+ return super.get(key);
+ }
+
+ @Override
+ public String getProperty(String key) {
+ Preconditions.checkState(containsKey(key));
+ return super.getProperty(key);
+ }
+
+}