You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/09/20 20:30:21 UTC

[12/15] Adding Helix-task-framework and Yarn integration modules

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/bootstrapper/ZookeeperService.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/bootstrapper/ZookeeperService.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/bootstrapper/ZookeeperService.java
new file mode 100644
index 0000000..8d78f9b
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/bootstrapper/ZookeeperService.java
@@ -0,0 +1,64 @@
+package org.apache.helix.autoscale.bootstrapper;
+
+import java.io.File;
+import java.util.Properties;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.autoscale.Service;
+import org.apache.log4j.Logger;
+
+/**
+ * Bootstrapping zookeeper. Convenience tool for creating standalone zookeeper
+ * instance for test deployments. For production use a separate zookeeper
+ * cluster is strongly recommended.
+ * 
+ */
+public class ZookeeperService implements Service {
+
+    static final Logger log = Logger.getLogger(ZookeeperService.class);
+
+    String              dataDir;
+    String              logDir;
+    int                 port;
+
+    ZkServer            server;
+
+    @Override
+    public void configure(Properties properties) throws Exception {
+        dataDir = properties.getProperty("datadir", "/tmp/zk/data");
+        logDir = properties.getProperty("logdir", "/tmp/zk/log");
+        port = Integer.parseInt(properties.getProperty("port", "2199"));
+    }
+
+    @Override
+    public void start() {
+        log.info(String.format("starting zookeeper service (dataDir='%s', logDir='%s', port=%d)", dataDir, logDir, port));
+
+        FileUtils.deleteQuietly(new File(dataDir));
+        FileUtils.deleteQuietly(new File(logDir));
+
+        IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
+            @Override
+            public void createDefaultNameSpace(ZkClient zkClient) {
+                // left blank
+            }
+        };
+
+        server = new ZkServer(dataDir, logDir, defaultNameSpace, port);
+        server.start();
+    }
+
+    @Override
+    public void stop() {
+        log.info("stopping zookeeper service");
+
+        if (server != null) {
+            server.shutdown();
+            server = null;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/container/ContainerProcess.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/container/ContainerProcess.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/container/ContainerProcess.java
new file mode 100644
index 0000000..343e426
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/container/ContainerProcess.java
@@ -0,0 +1,133 @@
+package org.apache.helix.autoscale.container;
+
+import java.util.Properties;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.autoscale.Service;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Base service for spawn-able container types. Configure from Properties and
+ * instantiates Helix participant to managed cluster.
+ * 
+ */
+public abstract class ContainerProcess implements Service {
+    static final Logger                             log    = Logger.getLogger(ContainerProcess.class);
+
+    private ContainerProcessProperties              properties;
+    private HelixManager                            participantManager;
+
+    private String                                  modelName;
+    private StateModelFactory<? extends StateModel> modelFactory;
+
+    private String                                  instanceName;
+    private String                                  clusterName;
+    private String                                  zookeeperAddress;
+
+    private boolean                                 active = false;
+    private boolean                                 failed = false;
+
+    public final void setModelName(String modelName) {
+        this.modelName = modelName;
+    }
+
+    public final void setModelFactory(StateModelFactory<? extends StateModel> modelFactory) {
+        this.modelFactory = modelFactory;
+    }
+
+    @Override
+    public void configure(Properties properties) throws Exception {
+        ContainerProcessProperties containerProps = new ContainerProcessProperties();
+        containerProps.putAll(properties);
+        Preconditions.checkArgument(containerProps.isValid());
+
+        this.properties = containerProps;
+        this.instanceName = containerProps.getName();
+        this.clusterName = containerProps.getCluster();
+        this.zookeeperAddress = containerProps.getAddress();
+    }
+
+    @Override
+    public final void start() {
+        try {
+            Preconditions.checkNotNull(modelName, "state model name not set");
+            Preconditions.checkNotNull(modelFactory, "state model factory not set");
+            Preconditions.checkState(properties.isValid(), "process properties not valid: %s", properties.toString());
+
+            log.info(String.format("starting container '%s'", instanceName));
+            startContainer();
+
+            log.info(String.format("starting helix participant '%s'", instanceName));
+            startParticipant();
+
+            active = true;
+
+        } catch (Exception e) {
+            log.error(String.format("starting container '%s' failed", instanceName), e);
+            fail();
+        }
+    }
+
+    protected abstract void startContainer() throws Exception;
+
+    private final void startParticipant() throws Exception {
+        participantManager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, zookeeperAddress);
+        participantManager.getStateMachineEngine().registerStateModelFactory(modelName, modelFactory);
+        participantManager.connect();
+    }
+
+    @Override
+    public final void stop() {
+        try {
+            log.info(String.format("stopping helix participant '%s'", instanceName));
+            stopParticipant();
+
+            log.info(String.format("stopping container '%s'", instanceName));
+            stopContainer();
+
+            active = false;
+
+        } catch (Exception e) {
+            log.warn(String.format("stopping container '%s' failed", instanceName), e);
+        }
+    }
+
+    protected abstract void stopContainer() throws Exception;
+
+    private final void stopParticipant() {
+        if (participantManager != null) {
+            participantManager.disconnect();
+        }
+    }
+
+    public final void fail() {
+        failed = true;
+    }
+
+    public final boolean isActive() {
+        return active && !failed;
+    }
+
+    public final boolean isFailed() {
+        return failed;
+    }
+
+    public final ContainerProcessProperties getProperties() {
+        return properties;
+    }
+
+    String getModelName() {
+        return modelName;
+    }
+
+    StateModelFactory<? extends StateModel> getModelFactory() {
+        return modelFactory;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/container/ContainerProcessProperties.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/container/ContainerProcessProperties.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/container/ContainerProcessProperties.java
new file mode 100644
index 0000000..1096174
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/container/ContainerProcessProperties.java
@@ -0,0 +1,66 @@
+package org.apache.helix.autoscale.container;
+
+import java.util.Properties;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Base configuration for ContainerProcess. 
+ * 
+ */
+public class ContainerProcessProperties extends Properties {
+    /**
+	 * 
+	 */
+    private static final long  serialVersionUID = 5754863079470995536L;
+
+    public static final String CLUSTER          = "cluster";
+    public static final String ADDRESS          = "address";
+    public static final String NAME             = "name";
+    public static final String CONTAINER_CLASS  = "class";
+
+    public ContainerProcessProperties() {
+        // left blank
+    }
+
+    public ContainerProcessProperties(Properties properties) {
+        Preconditions.checkNotNull(properties);
+        putAll(properties);
+    }
+	
+	public boolean isValid() {
+		return containsKey(CLUSTER) &&
+			   containsKey(NAME) &&
+			   containsKey(ADDRESS) &&
+			   containsKey(CONTAINER_CLASS);
+	}
+	
+    public String getCluster() {
+        return getProperty(CLUSTER);
+    }
+
+    public String getAddress() {
+        return getProperty(ADDRESS);
+    }
+
+    public String getName() {
+        return getProperty(NAME);
+    }
+
+    public String getContainerClass() {
+        return getProperty(CONTAINER_CLASS);
+    }
+
+    @Override
+    public synchronized Object get(Object key) {
+        Preconditions.checkState(containsKey(key));
+        return super.get(key);
+    }
+
+    @Override
+    public String getProperty(String key) {
+        Preconditions.checkState(containsKey(key));
+        return super.getProperty(key);
+    }
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/container/ContainerUtils.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/container/ContainerUtils.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/container/ContainerUtils.java
new file mode 100644
index 0000000..8bab01e
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/container/ContainerUtils.java
@@ -0,0 +1,46 @@
+package org.apache.helix.autoscale.container;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.reflect.Constructor;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Utility for loading ContainerProperties and spawning ContainerProcess.
+ * 
+ */
+public class ContainerUtils {
+
+    static final Logger log = Logger.getLogger(ContainerUtils.class);
+
+    private ContainerUtils() {
+        // left blank
+    }
+
+    public static ContainerProcess createProcess(ContainerProcessProperties properties) throws Exception {
+        String containerClassName = properties.getContainerClass();
+
+        Class<?> containerClass = Class.forName(containerClassName);
+
+        log.debug(String.format("checking for properties constructor in class '%s'", containerClassName));
+
+        Constructor<?> constructor = containerClass.getConstructor(ContainerProcessProperties.class);
+
+        return (ContainerProcess) constructor.newInstance(properties);
+    }
+
+    public static ContainerProcessProperties getPropertiesFromResource(String resourceName) throws IOException {
+        ContainerProcessProperties properties = new ContainerProcessProperties();
+        properties.load(ClassLoader.getSystemResourceAsStream(resourceName));
+        return properties;
+    }
+
+    public static ContainerProcessProperties getPropertiesFromPath(String filePath) throws IOException {
+        ContainerProcessProperties properties = new ContainerProcessProperties();
+        properties.load(new InputStreamReader(new FileInputStream(filePath)));
+        return properties;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/FileTargetProvider.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/FileTargetProvider.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/FileTargetProvider.java
new file mode 100644
index 0000000..ebbf4b6
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/FileTargetProvider.java
@@ -0,0 +1,51 @@
+package org.apache.helix.autoscale.impl;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.helix.autoscale.TargetProviderService;
+
+/**
+ * File-based target model. Container count is extracted from properties file. Count may change dynamically.
+ * 
+ */
+public class FileTargetProvider implements TargetProviderService {
+
+    File file;
+
+    public FileTargetProvider() {
+        // left blank
+    }
+
+    public FileTargetProvider(String path) {
+        this.file = new File(path);
+    }
+
+    @Override
+    public int getTargetContainerCount(String containerType) throws FileNotFoundException, IOException, IllegalArgumentException {
+        Properties properties = new Properties();
+        properties.load(new FileReader(file));
+        if (!properties.contains(containerType))
+            throw new IllegalArgumentException(String.format("container type '%s' not found in '%s'", containerType, file.getCanonicalPath()));
+        return Integer.parseInt((String) properties.get(containerType));
+    }
+
+    @Override
+    public void configure(Properties properties) throws Exception {
+        this.file = new File(properties.getProperty("path"));
+    }
+
+    @Override
+    public void start() throws Exception {
+        // left blank
+    }
+
+    @Override
+    public void stop() throws Exception {
+        // left blank
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/RedisTargetProvider.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/RedisTargetProvider.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/RedisTargetProvider.java
new file mode 100644
index 0000000..723ac4d
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/RedisTargetProvider.java
@@ -0,0 +1,356 @@
+package org.apache.helix.autoscale.impl;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.autoscale.TargetProviderService;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Redis-specific target model based on recurring Tps benchmarking. Tps target
+ * and probed redis-server instances are configured via zookeeper. Tps target
+ * may change dynamically.
+ * 
+ */
+public class RedisTargetProvider implements TargetProviderService {
+
+    static final Logger        log               = Logger.getLogger(RedisTargetProvider.class);
+
+    public static final String BENCHMARK_COMMAND = "redis-benchmark";
+    public static final String BENCHMARK_TESTS   = "GET,SET";
+
+    public static final String DEFAULT_RECORDS   = "100000";
+    public static final String DEFAULT_CLIENTS   = "20";
+    public static final String DEFAULT_REQUESTS  = "100000";
+    public static final String DEFAULT_TIMEOUT   = "8000";
+    public static final String DEFAULT_INTERVAL  = "10000";
+    public static final String DEFAULT_ALPHA     = "0.25";
+
+    ZkClient                   zookeeper;
+
+    String                     address;
+    String                     root;
+
+    int                        records;
+    int                        clients;
+    int                        requests;
+    int                        timeout;
+    int                        interval;
+
+    int                        targetTpsGet;
+    int                        targetTpsSet;
+
+    int                        targetCountMin;
+    int                        targetCountMax;
+    int                        targetCount;
+
+    double                     alpha;
+    double                     averageTpsGet;
+    double                     averageTpsSet;
+    double                     averageCount;
+
+    ScheduledExecutorService   executor;
+
+    @Override
+    public void configure(Properties properties) {
+        address = properties.getProperty("address");
+        root = properties.getProperty("root");
+        targetTpsGet = Integer.valueOf(properties.getProperty("get", "0"));
+        targetTpsSet = Integer.valueOf(properties.getProperty("set", "0"));
+        targetCountMin = Integer.valueOf(properties.getProperty("min", "-1"));
+        targetCountMax = Integer.valueOf(properties.getProperty("max", "-1"));
+        records = Integer.valueOf(properties.getProperty("records", DEFAULT_RECORDS));
+        clients = Integer.valueOf(properties.getProperty("clients", DEFAULT_CLIENTS));
+        requests = Integer.valueOf(properties.getProperty("requests", DEFAULT_REQUESTS));
+        timeout = Integer.valueOf(properties.getProperty("timeout", DEFAULT_TIMEOUT));
+        interval = Integer.valueOf(properties.getProperty("interval", DEFAULT_INTERVAL));
+        alpha = Double.valueOf(properties.getProperty("alpha", DEFAULT_ALPHA));
+    }
+
+    @Override
+    public void start() {
+        log.debug("starting redis status service");
+        zookeeper = new ZkClient(address);
+        zookeeper.createPersistent("/" + root, true);
+
+        try { zookeeper.createPersistent("/" + root + "/target.get", String.valueOf(targetTpsGet)); } catch (Exception ignore) {}
+        try { zookeeper.createPersistent("/" + root + "/target.set", String.valueOf(targetTpsSet)); } catch (Exception ignore) {}
+        try { zookeeper.createPersistent("/" + root + "/target.min", String.valueOf(targetCountMin)); } catch (Exception ignore) {}
+        try { zookeeper.createPersistent("/" + root + "/target.max", String.valueOf(targetCountMax)); } catch (Exception ignore) {}
+ 
+        executor = Executors.newSingleThreadScheduledExecutor();
+        executor.scheduleAtFixedRate(new RedisBenchmarkRunnable(), 0, interval, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void stop() {
+        log.debug("stopping redis status service");
+        if (executor != null) {
+            executor.shutdownNow();
+            while (!executor.isTerminated()) {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+            executor = null;
+        }
+        if (zookeeper != null) {
+            zookeeper.close();
+            zookeeper = null;
+        }
+    }
+
+    @Override
+    public int getTargetContainerCount(String containerType) throws Exception {
+        return targetCount;
+    }
+
+    private class RedisBenchmarkRunnable implements Runnable {
+        ExecutorService executor = Executors.newCachedThreadPool();
+        RedisResult     aggregateResult;
+
+        @Override
+        public void run() {
+            log.debug("running redis benchmark");
+
+            aggregateResult = new RedisResult(0);
+            Collection<Future<RedisResult>> futures = new ArrayList<Future<RedisResult>>();
+
+            try {
+                Collection<RedisTarget> targets = getTargets();
+
+                // start benchmark
+                for (RedisTarget target : targets) {
+                    log.debug(String.format("submitting target '%s'", target));
+                    Future<RedisResult> future = executor.submit(new RedisCallable(target));
+                    futures.add(future);
+                }
+
+                // aggregate results
+                try {
+                    log.debug("waiting for results");
+
+                    long limit = System.currentTimeMillis() + timeout;
+                    for (Future<RedisResult> future : futures) {
+                        try {
+                            RedisResult result = future.get(limit - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+                            log.debug(String.format("got result '%s'", result));
+                            aggregate(result);
+                        } catch (Exception e) {
+                            log.warn(String.format("failed to get result"));
+                            future.cancel(true);
+                        }
+                    }
+                } catch (Exception e) {
+                    log.error("Error running redis benchmark", e);
+
+                    for (Future<RedisResult> future : futures) {
+                        future.cancel(true);
+                    }
+
+                    return;
+                }
+
+                // compare to thresholds
+                log.debug(String.format("aggregate result is '%s'", aggregateResult));
+
+                // get target from zookeeper
+                try { targetTpsGet = Integer.valueOf(zookeeper.<String> readData("/" + root + "/target.get")); } catch (Exception ignore) {}
+                try { targetTpsSet = Integer.valueOf(zookeeper.<String> readData("/" + root + "/target.set")); } catch (Exception ignore) {}
+                try { targetCountMin = Integer.valueOf(zookeeper.<String> readData("/" + root + "/target.min")); } catch (Exception ignore) {}
+                try { targetCountMax = Integer.valueOf(zookeeper.<String> readData("/" + root + "/target.max")); } catch (Exception ignore) {}
+
+                averageCount = alpha * aggregateResult.serverCount + (1.0 - alpha) * averageCount;
+
+                // calculate counts
+                int targetCountGet = -1;
+                if (aggregateResult.containsKey("GET")) {
+                    double tpsTarget = targetTpsGet;
+                    double tps = aggregateResult.get("GET");
+
+                    averageTpsGet = alpha * tps + (1.0 - alpha) * averageTpsGet;
+
+                    targetCountGet = (int) Math.ceil(tpsTarget / averageTpsGet * averageCount);
+                    log.debug(String.format("count.get=%d, target.get=%f, tps.get=%f, tps.avg.get=%f, count.avg=%f", targetCountGet, tpsTarget, tps,
+                            averageTpsGet, averageCount));
+                }
+
+                int targetCountSet = -1;
+                if (aggregateResult.containsKey("SET")) {
+                    double tpsTarget = targetTpsSet;
+                    double tps = aggregateResult.get("SET");
+
+                    averageTpsSet = alpha * tps + (1.0 - alpha) * averageTpsSet;
+
+                    targetCountSet = (int) Math.ceil(tpsTarget / averageTpsSet * averageCount);
+                    log.debug(String.format("count.set=%d, target.set=%f, tps.set=%f, tps.avg.set=%f, count.avg=%f", targetCountSet, tpsTarget, tps,
+                            averageTpsSet, averageCount));
+                }
+
+                targetCount = Math.max(targetCountGet, targetCountSet);
+
+                if (targetCountMin > 0)
+                    targetCount = Math.max(targetCount, targetCountMin);
+                if (targetCountMax > 0)
+                    targetCount = Math.min(targetCount, targetCountMax);
+
+                targetCount = Math.max(targetCount, 1);
+
+                log.debug(String.format("target count is %d", targetCount));
+                RedisTargetProvider.this.targetCount = targetCount;
+
+            } catch (Exception e) {
+                log.error("Error running redis benchmark", e);
+
+                for (Future<RedisResult> future : futures) {
+                    future.cancel(true);
+                }
+            }
+
+        }
+
+        Collection<RedisTarget> getTargets() {
+            log.debug("fetching redis servers from zookeeper");
+            Collection<RedisTarget> targets = new ArrayList<RedisTarget>();
+            Collection<String> servers = zookeeper.getChildren("/" + root);
+
+            servers.remove("target.get");
+            servers.remove("target.set");
+            servers.remove("target.min");
+            servers.remove("target.max");
+
+            for (String server : servers) {
+                if (!zookeeper.exists("/" + root + "/" + server + "/heartbeat"))
+                    continue;
+
+                String hostname = zookeeper.readData("/" + root + "/" + server + "/hostname");
+                int port = Integer.valueOf(zookeeper.<String> readData("/" + root + "/" + server + "/port"));
+
+                targets.add(new RedisTarget(hostname, port));
+            }
+
+            log.debug(String.format("found %d servers: %s", targets.size(), targets));
+            return targets;
+        }
+
+        void aggregate(RedisResult result) {
+            RedisResult newResult = new RedisResult(aggregateResult.serverCount + result.serverCount);
+
+            for (Entry<String, Double> entry : result.entrySet()) {
+                double current = 0.0d;
+                if (aggregateResult.containsKey(entry.getKey()))
+                    current = aggregateResult.get(entry.getKey());
+
+                current += entry.getValue();
+                newResult.put(entry.getKey(), current);
+            }
+
+            aggregateResult = newResult;
+        }
+    }
+
+    private static class RedisTarget {
+        final String hostname;
+        final int    port;
+
+        public RedisTarget(String hostname, int port) {
+            this.hostname = hostname;
+            this.port = port;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("%s:%d", hostname, port);
+        }
+    }
+
+    private static class RedisResult extends HashMap<String, Double> {
+        /**
+         * 
+         */
+        private static final long serialVersionUID = 4599748807597500952L;
+
+        final int                 serverCount;
+
+        public RedisResult(int serverCount) {
+            this.serverCount = serverCount;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("[serverCount=%d %s]", serverCount, super.toString());
+        }
+    }
+
+    private class RedisCallable implements Callable<RedisResult> {
+        final RedisTarget target;
+
+        public RedisCallable(RedisTarget target) {
+            this.target = target;
+        }
+
+        @Override
+        public RedisResult call() throws Exception {
+            log.debug(String.format("executing benchmark for '%s'", target));
+
+            ProcessBuilder builder = new ProcessBuilder();
+            builder.command(BENCHMARK_COMMAND, "-h", target.hostname, "-p", String.valueOf(target.port), "-r", String.valueOf(records), "-n",
+                    String.valueOf(requests), "-c", String.valueOf(clients), "-t", BENCHMARK_TESTS, "--csv");
+            Process process = builder.start();
+
+            log.debug(String.format("running '%s'", builder.command()));
+
+            RedisResult result = new RedisResult(1);
+
+            int retVal;
+            try {
+                retVal = process.waitFor();
+            } catch (InterruptedException e) {
+                process.destroy();
+                return result;
+            }
+
+            Preconditions.checkState(retVal == 0, "Benchmark process returned %s", retVal);
+
+            Pattern pattern = Pattern.compile("\"([A-Z0-9_]+).*\",\"([0-9\\.]+)\"");
+
+            log.debug("parsing output");
+            BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+            String line = null;
+            while ((line = reader.readLine()) != null) {
+                Matcher matcher = pattern.matcher(line);
+
+                if (!matcher.find())
+                    continue;
+
+                String key = matcher.group(1);
+                Double value = Double.valueOf(matcher.group(2));
+
+                result.put(key, value);
+            }
+
+            log.debug(String.format("benchmark for '%s' returned '%s'", target, result));
+
+            return result;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/StaticTargetProvider.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/StaticTargetProvider.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/StaticTargetProvider.java
new file mode 100644
index 0000000..346f0fe
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/StaticTargetProvider.java
@@ -0,0 +1,62 @@
+package org.apache.helix.autoscale.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.helix.autoscale.TargetProviderService;
+import org.apache.log4j.Logger;
+
+/**
+ * Target model based on manually set count. Count may change dynamically.
+ * 
+ */
+public class StaticTargetProvider implements TargetProviderService {
+    static final Logger        log          = Logger.getLogger(StaticTargetProvider.class);
+
+    final Map<String, Integer> targetCounts = new HashMap<String, Integer>();
+
+    public StaticTargetProvider() {
+        // left blank
+    }
+
+    public StaticTargetProvider(Map<String, Integer> targetCounts) {
+        this.targetCounts.putAll(targetCounts);
+    }
+
+    @Override
+    public int getTargetContainerCount(String containerType) {
+        return targetCounts.get(containerType);
+    }
+
+    public void setTargetContainerCount(String containerType, int targetCount) {
+        targetCounts.put(containerType, targetCount);
+    }
+
+    @Override
+    public void configure(Properties properties) throws Exception {
+        for (Entry<Object, Object> entry : properties.entrySet()) {
+            String key = (String) entry.getKey();
+
+            try {
+                int value = Integer.valueOf((String) entry.getValue());
+                log.debug(String.format("Inserting value '%s = %d'", key, value));
+                targetCounts.put(key, value);
+            } catch (NumberFormatException e) {
+                log.warn(String.format("Skipping '%s', not an integer (value='%s')", key, (String) entry.getValue()));
+            }
+        }
+    }
+
+    @Override
+    public void start() throws Exception {
+        // left blank
+    }
+
+    @Override
+    public void stop() throws Exception {
+        // left blank
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/DummyMasterSlaveProcess.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/DummyMasterSlaveProcess.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/DummyMasterSlaveProcess.java
new file mode 100644
index 0000000..683249d
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/DummyMasterSlaveProcess.java
@@ -0,0 +1,76 @@
+package org.apache.helix.autoscale.impl.container;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.autoscale.container.ContainerProcess;
+import org.apache.helix.autoscale.container.ContainerProcessProperties;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+/**
+ * Sample implementation of container with Helix participant for MasterSlave
+ * state model. Print state transitions only.
+ * 
+ */
+public class DummyMasterSlaveProcess extends ContainerProcess {
+
+    static final Logger log = Logger.getLogger(DummyMasterSlaveProcess.class);
+
+    public DummyMasterSlaveProcess(ContainerProcessProperties properties) throws Exception {
+        configure(properties);
+        setModelName("MasterSlave");
+        setModelFactory(new DummyMasterSlaveModelFactory());
+    }
+
+    @Override
+    protected void startContainer() throws Exception {
+        log.info("starting dummy process container");
+    }
+
+    @Override
+    protected void stopContainer() throws Exception {
+        log.info("stopping dummy process container");
+    }
+
+    public static class DummyMasterSlaveModelFactory extends StateModelFactory<DummyMasterSlaveStateModel> {
+        @Override
+        public DummyMasterSlaveStateModel createNewStateModel(String partitionName) {
+            return new DummyMasterSlaveStateModel();
+        }
+    }
+
+    @StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "SLAVE", "MASTER", "DROPPED" })
+    public static class DummyMasterSlaveStateModel extends StateModel {
+
+        static final Logger log = Logger.getLogger(DummyMasterSlaveStateModel.class);
+
+        @Transition(from = "OFFLINE", to = "SLAVE")
+        public void offlineToSlave(Message m, NotificationContext context) {
+            log.trace(String.format("%s transitioning from OFFLINE to SLAVE", context.getManager().getInstanceName()));
+        }
+
+        @Transition(from = "SLAVE", to = "OFFLINE")
+        public void slaveToOffline(Message m, NotificationContext context) {
+            log.trace(String.format("%s transitioning from SLAVE to OFFLINE", context.getManager().getInstanceName()));
+        }
+
+        @Transition(from = "SLAVE", to = "MASTER")
+        public void slaveToMaster(Message m, NotificationContext context) {
+            log.trace(String.format("%s transitioning from SLAVE to MASTER", context.getManager().getInstanceName()));
+        }
+
+        @Transition(from = "MASTER", to = "SLAVE")
+        public void masterToSlave(Message m, NotificationContext context) {
+            log.trace(String.format("%s transitioning from MASTER to SLAVE", context.getManager().getInstanceName()));
+        }
+
+        @Transition(from = "OFFLINE", to = "DROPPED")
+        public void offlineToDropped(Message m, NotificationContext context) {
+            log.trace(String.format("%s transitioning from OFFLINE to DROPPED", context.getManager().getInstanceName()));
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/DummyOnlineOfflineProcess.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/DummyOnlineOfflineProcess.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/DummyOnlineOfflineProcess.java
new file mode 100644
index 0000000..a0aad8e
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/DummyOnlineOfflineProcess.java
@@ -0,0 +1,66 @@
+package org.apache.helix.autoscale.impl.container;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.autoscale.container.ContainerProcess;
+import org.apache.helix.autoscale.container.ContainerProcessProperties;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+/**
+ * Sample implementation of container with Helix participant for OnlineOffline
+ * state model. Print state transitions only.
+ * 
+ */
+public class DummyOnlineOfflineProcess extends ContainerProcess {
+
+    static final Logger log = Logger.getLogger(DummyOnlineOfflineProcess.class);
+
+    public DummyOnlineOfflineProcess(ContainerProcessProperties properties) throws Exception {
+        configure(properties);
+        setModelName("OnlineOffline");
+        setModelFactory(new DummyOnlineOfflineModelFactory());
+    }
+
+    @Override
+    protected void startContainer() throws Exception {
+        log.info("starting dummy online-offline process container");
+    }
+
+    @Override
+    protected void stopContainer() throws Exception {
+        log.info("stopping dummy online-offline process container");
+    }
+
+    public static class DummyOnlineOfflineModelFactory extends StateModelFactory<DummyOnlineOfflineStateModel> {
+        @Override
+        public DummyOnlineOfflineStateModel createNewStateModel(String partitionName) {
+            return new DummyOnlineOfflineStateModel();
+        }
+    }
+
+    @StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "ONLINE", "DROPPED" })
+    public static class DummyOnlineOfflineStateModel extends StateModel {
+
+        static final Logger log = Logger.getLogger(DummyOnlineOfflineStateModel.class);
+
+        @Transition(from = "OFFLINE", to = "ONLINE")
+        public void offlineToOnline(Message m, NotificationContext context) {
+            log.trace(String.format("%s transitioning from OFFLINE to ONLINE", context.getManager().getInstanceName()));
+        }
+
+        @Transition(from = "ONLINE", to = "OFFLINE")
+        public void onlineToOffline(Message m, NotificationContext context) {
+            log.trace(String.format("%s transitioning from ONLINE to OFFLINE", context.getManager().getInstanceName()));
+        }
+
+        @Transition(from = "OFFLINE", to = "DROPPED")
+        public void offlineToDropped(Message m, NotificationContext context) {
+            log.trace(String.format("%s transitioning from OFFLINE to DROPPED", context.getManager().getInstanceName()));
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/RedisServerProcess.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/RedisServerProcess.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/RedisServerProcess.java
new file mode 100644
index 0000000..5f6f745
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/RedisServerProcess.java
@@ -0,0 +1,140 @@
+package org.apache.helix.autoscale.impl.container;
+
+import java.net.InetAddress;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.autoscale.container.ContainerProcess;
+import org.apache.helix.autoscale.container.ContainerProcessProperties;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+/**
+ * Container implementation for redis-server. Uses OnlineOffline model, spawns
+ * Redis as Shell process and writes metadata to zookeeper.
+ * 
+ */
+public class RedisServerProcess extends ContainerProcess {
+
+    static final Logger        log                  = Logger.getLogger(RedisServerProcess.class);
+
+    public static final String REDIS_SERVER_COMMAND = "redis-server";
+
+    public static final long   MONITOR_INTERVAL     = 5000;
+
+    ZkClient                   zookeeper;
+
+    final String               address;
+    final String               root;
+    final String               name;
+    final int                  basePort;
+
+    Process                    process;
+
+    ScheduledExecutorService   executor;
+
+    public RedisServerProcess(ContainerProcessProperties properties) throws Exception {
+        configure(properties);
+        setModelName("OnlineOffline");
+        setModelFactory(new RedisServerModelFactory());
+
+        address = properties.getProperty("address");
+        root = properties.getProperty("root");
+        basePort = Integer.valueOf(properties.getProperty("baseport"));
+        name = properties.getProperty(ContainerProcessProperties.NAME);
+    }
+
+    @Override
+    protected void startContainer() throws Exception {
+        log.info(String.format("starting redis server container for instance '%s'", name));
+
+        String hostname = InetAddress.getLocalHost().getHostName();
+        int port = basePort + Integer.valueOf(name.split("_")[1]);
+
+        log.debug(String.format("Starting redis server at '%s:%d'", hostname, port));
+
+        ProcessBuilder builder = new ProcessBuilder();
+        builder.command(REDIS_SERVER_COMMAND, "--port", String.valueOf(port));
+        process = builder.start();
+
+        log.debug("Updating zookeeper");
+        zookeeper = new ZkClient(address);
+        zookeeper.deleteRecursive("/" + root + "/" + name);
+        zookeeper.createPersistent("/" + root + "/" + name, true);
+        zookeeper.createPersistent("/" + root + "/" + name + "/hostname", hostname);
+        zookeeper.createPersistent("/" + root + "/" + name + "/port", String.valueOf(port));
+        zookeeper.createEphemeral("/" + root + "/" + name + "/heartbeat");
+
+        log.debug("Starting process monitor");
+        executor = Executors.newSingleThreadScheduledExecutor();
+        executor.scheduleAtFixedRate(new ProcessMonitor(), 0, MONITOR_INTERVAL, TimeUnit.MILLISECONDS);
+
+    }
+
+    @Override
+    protected void stopContainer() throws Exception {
+        log.info("stopping redis server container");
+
+        log.debug("Stopping process monitor");
+        executor.shutdownNow();
+
+        log.debug("Updating zookeeper");
+        zookeeper.deleteRecursive("/" + root + "/" + name);
+        zookeeper.close();
+
+        log.debug("Stopping process");
+        process.destroy();
+        process.waitFor();
+    }
+
+    public class RedisServerModelFactory extends StateModelFactory<RedisServerModel> {
+        @Override
+        public RedisServerModel createNewStateModel(String partitionName) {
+            return new RedisServerModel();
+        }
+    }
+
+    @StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "ONLINE", "DROPPED" })
+    public class RedisServerModel extends StateModel {
+
+        @Transition(from = "OFFLINE", to = "ONLINE")
+        public void offlineToSlave(Message m, NotificationContext context) {
+            // left blank
+            log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+        }
+
+        @Transition(from = "ONLINE", to = "OFFLINE")
+        public void slaveToOffline(Message m, NotificationContext context) {
+            // left blank
+            log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+        }
+
+        @Transition(from = "OFFLINE", to = "DROPPED")
+        public void offlineToDropped(Message m, NotificationContext context) {
+            // left blank
+            log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+        }
+
+    }
+
+    private class ProcessMonitor implements Runnable {
+        @Override
+        public void run() {
+            try {
+                process.exitValue();
+                log.warn("detected process failure");
+                fail();
+            } catch (Exception e) {
+                // expected
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/ZookeeperMasterSlaveProcess.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/ZookeeperMasterSlaveProcess.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/ZookeeperMasterSlaveProcess.java
new file mode 100644
index 0000000..0c1b728
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/container/ZookeeperMasterSlaveProcess.java
@@ -0,0 +1,108 @@
+package org.apache.helix.autoscale.impl.container;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.autoscale.container.ContainerProcess;
+import org.apache.helix.autoscale.container.ContainerProcessProperties;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+/**
+ * Sample implementation of container with Helix participant for MasterSlave
+ * state model. Writes current state to separate zookeeper domain.
+ * 
+ */
+public class ZookeeperMasterSlaveProcess extends ContainerProcess {
+
+    static final Logger log = Logger.getLogger(ZookeeperMasterSlaveProcess.class);
+
+    ZkClient            zookeeper;
+
+    final String        address;
+    final String        root;
+    final String        name;
+
+    public ZookeeperMasterSlaveProcess(ContainerProcessProperties properties) throws Exception {
+        configure(properties);
+        setModelName("MasterSlave");
+        setModelFactory(new ZookeeperMasterSlaveModelFactory());
+
+        address = properties.getProperty("address");
+        root = properties.getProperty("root");
+        name = properties.getProperty(ContainerProcessProperties.NAME);
+    }
+
+    @Override
+    protected void startContainer() throws Exception {
+        log.info("starting zookeeper process container");
+
+        zookeeper = new ZkClient(address);
+        zookeeper.createPersistent("/" + root + "/" + name, true);
+    }
+
+    @Override
+    protected void stopContainer() throws Exception {
+        log.info("stopping zookeeper process container");
+
+        zookeeper.close();
+    }
+
+    public class ZookeeperMasterSlaveModelFactory extends StateModelFactory<ZookeeperMasterSlaveModel> {
+        @Override
+        public ZookeeperMasterSlaveModel createNewStateModel(String partitionName) {
+            return new ZookeeperMasterSlaveModel();
+        }
+    }
+
+    @StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "SLAVE", "MASTER", "DROPPED" })
+    public class ZookeeperMasterSlaveModel extends StateModel {
+
+        @Transition(from = "OFFLINE", to = "SLAVE")
+        public void offlineToSlave(Message m, NotificationContext context) {
+            transition(m, context);
+        }
+
+        @Transition(from = "SLAVE", to = "OFFLINE")
+        public void slaveToOffline(Message m, NotificationContext context) {
+            transition(m, context);
+        }
+
+        @Transition(from = "SLAVE", to = "MASTER")
+        public void slaveToMaster(Message m, NotificationContext context) {
+            transition(m, context);
+        }
+
+        @Transition(from = "MASTER", to = "SLAVE")
+        public void masterToSlave(Message m, NotificationContext context) {
+            transition(m, context);
+        }
+
+        @Transition(from = "OFFLINE", to = "DROPPED")
+        public void offlineToDropped(Message m, NotificationContext context) {
+            log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+
+            String resource = m.getResourceName();
+            String partition = m.getPartitionName();
+            String path = "/" + root + "/" + name + "/" + resource + "_" + partition;
+
+            zookeeper.delete(path);
+        }
+
+        public void transition(Message m, NotificationContext context) {
+            log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+
+            String resource = m.getResourceName();
+            String partition = m.getPartitionName();
+            String path = "/" + root + "/" + name + "/" + resource + "_" + partition;
+
+            zookeeper.delete(path);
+            zookeeper.createEphemeral(path, m.getToState());
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalContainerProvider.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalContainerProvider.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalContainerProvider.java
new file mode 100644
index 0000000..7e5d553
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalContainerProvider.java
@@ -0,0 +1,119 @@
+package org.apache.helix.autoscale.impl.local;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.helix.autoscale.ContainerProvider;
+import org.apache.helix.autoscale.ContainerProviderService;
+import org.apache.helix.autoscale.container.ContainerProcess;
+import org.apache.helix.autoscale.container.ContainerProcessProperties;
+import org.apache.helix.autoscale.container.ContainerUtils;
+import org.apache.helix.autoscale.impl.local.LocalContainerSingleton.LocalProcess;
+import org.apache.helix.autoscale.provider.ProviderProperties;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * {@link ContainerProvider} spawning VM-local containers. Only works in single-VM
+ * deployments as container metadata is managed via singleton.
+ * 
+ * @see LocalContainerSingleton
+ */
+class LocalContainerProvider implements ContainerProviderService {
+
+    static final Logger           log   = Logger.getLogger(LocalContainerProvider.class);
+
+    final Map<String, Properties> types = new HashMap<String, Properties>();
+
+    String                        address;
+    String                        cluster;
+    String                        name;
+
+    @Override
+    public void configure(Properties properties) throws Exception {
+        ProviderProperties providerProperties = new ProviderProperties();
+        providerProperties.putAll(properties);
+        Preconditions.checkArgument(providerProperties.isValid());
+
+        this.address = providerProperties.getProperty("address");
+        this.cluster = providerProperties.getProperty("cluster");
+        this.name = providerProperties.getProperty("name");
+
+        for (String containerType : providerProperties.getContainers()) {
+            registerType(containerType, providerProperties.getContainer(containerType));
+        }
+    }
+
+    @Override
+    public void start() throws Exception {
+        // left blank
+    }
+
+    @Override
+    public void stop() throws Exception {
+        destroyAll();
+    }
+
+    @Override
+    public void create(String id, String type) throws Exception {
+        Map<String, LocalProcess> processes = LocalContainerSingleton.getProcesses();
+
+        synchronized (processes) {
+            Preconditions.checkState(!processes.containsKey(id), "Process '%s' already exists", id);
+            Preconditions.checkState(types.containsKey(type), "Type '%s' is not registered", type);
+
+            ContainerProcessProperties properties = new ContainerProcessProperties(types.get(type));
+
+            properties.setProperty(ContainerProcessProperties.CLUSTER, cluster);
+            properties.setProperty(ContainerProcessProperties.NAME, id);
+            properties.setProperty(ContainerProcessProperties.ADDRESS, address);
+
+            log.info(String.format("Running container '%s' (properties='%s')", id, properties));
+
+            ContainerProcess process = ContainerUtils.createProcess(properties);
+            process.start();
+
+            processes.put(id, new LocalProcess(id, name, process));
+
+        }
+    }
+
+    @Override
+    public void destroy(String id) throws Exception {
+        Map<String, LocalProcess> processes = LocalContainerSingleton.getProcesses();
+
+        synchronized (processes) {
+            if (!processes.containsKey(id))
+                throw new IllegalArgumentException(String.format("Process '%s' does not exists", id));
+
+            log.info(String.format("Destroying container '%s'", id));
+
+            LocalProcess local = processes.remove(id);
+
+            local.process.stop();
+        }
+    }
+
+    @Override
+    public void destroyAll() {
+        Map<String, LocalProcess> processes = LocalContainerSingleton.getProcesses();
+
+        synchronized (processes) {
+            log.info("Destroying all processes");
+            for (LocalProcess local : new HashSet<LocalProcess>(processes.values())) {
+                if (local.owner.equals(name)) {
+                    try { destroy(local.id); } catch (Exception ignore) {}
+                }
+            }
+        }
+    }
+
+    void registerType(String name, Properties properties) {
+        log.debug(String.format("Registering container type '%s' (properties='%s')", name, properties));
+        types.put(name, properties);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalContainerProviderProcess.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalContainerProviderProcess.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalContainerProviderProcess.java
new file mode 100644
index 0000000..ca1047c
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalContainerProviderProcess.java
@@ -0,0 +1,45 @@
+package org.apache.helix.autoscale.impl.local;
+
+import java.util.Properties;
+
+import org.apache.helix.autoscale.Service;
+import org.apache.helix.autoscale.provider.ProviderProcess;
+import org.apache.helix.autoscale.provider.ProviderProperties;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Configurable and runnable service for {@link LocalContainerProvider}. 
+ *
+ */
+public class LocalContainerProviderProcess implements Service {
+    LocalContainerProvider provider;
+    ProviderProcess        process;
+
+    @Override
+    public void configure(Properties properties) throws Exception {
+        ProviderProperties providerProperties = new ProviderProperties();
+        providerProperties.putAll(properties);
+        
+        Preconditions.checkArgument(providerProperties.isValid(), "provider properties not valid (properties='%s')", properties);
+        
+        provider = new LocalContainerProvider();
+        provider.configure(properties);
+
+        process = new ProviderProcess();
+        process.configure(providerProperties);
+        process.setConteinerProvider(provider);
+    }
+
+    @Override
+    public void start() throws Exception {
+        provider.start();
+        process.start();
+    }
+
+    @Override
+    public void stop() throws Exception {
+        process.stop();
+        provider.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalContainerSingleton.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalContainerSingleton.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalContainerSingleton.java
new file mode 100644
index 0000000..74f9279
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalContainerSingleton.java
@@ -0,0 +1,56 @@
+package org.apache.helix.autoscale.impl.local;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.autoscale.container.ContainerProcess;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Singleton tracking metadata for VM-local containers spawned via
+ * {@link LocalContainerProvider}.
+ * 
+ */
+public class LocalContainerSingleton {
+    final static Map<String, LocalProcess> processes = new HashMap<String, LocalProcess>();
+
+    private LocalContainerSingleton() {
+        // left blank
+    }
+
+    public static Map<String, LocalProcess> getProcesses() {
+        return processes;
+    }
+
+    public static void reset() {
+        synchronized (processes) {
+            for (LocalProcess local : processes.values()) {
+                local.process.stop();
+            }
+            processes.clear();
+        }
+    }
+
+    public static void killProcess(String id) throws InterruptedException {
+        synchronized (processes) {
+            Preconditions.checkArgument(processes.containsKey(id), "Process '%s' does not exist", id);
+            ContainerProcess process = processes.get(id).process;
+            process.stop();
+            processes.remove(id);
+        }
+    }
+
+    static class LocalProcess {
+        final String           id;
+        final String           owner;
+        final ContainerProcess process;
+
+        public LocalProcess(String id, String owner, ContainerProcess process) {
+            this.id = id;
+            this.owner = owner;
+            this.process = process;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalStatusProvider.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalStatusProvider.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalStatusProvider.java
new file mode 100644
index 0000000..7cb02bb
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/local/LocalStatusProvider.java
@@ -0,0 +1,53 @@
+package org.apache.helix.autoscale.impl.local;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.helix.autoscale.StatusProviderService;
+import org.apache.helix.autoscale.impl.local.LocalContainerSingleton.LocalProcess;
+
+/**
+ * StatusProvider for VM-local containers spawned via
+ * {@link LocalContainerProvider}. Runnable and configurable service.
+ * 
+ */
+public class LocalStatusProvider implements StatusProviderService {
+
+    @Override
+    public boolean exists(String id) {
+        Map<String, LocalProcess> processes = LocalContainerSingleton.getProcesses();
+
+        synchronized (processes) {
+            return processes.containsKey(id);
+        }
+    }
+
+    @Override
+    public boolean isHealthy(String id) {
+        Map<String, LocalProcess> processes = LocalContainerSingleton.getProcesses();
+
+        synchronized (processes) {
+            LocalProcess local = processes.get(id);
+
+            if (local == null)
+                return false;
+
+            return local.process.isActive();
+        }
+    }
+
+    @Override
+    public void configure(Properties properties) throws Exception {
+        // left blank
+    }
+
+    @Override
+    public void start() throws Exception {
+        // left blank
+    }
+
+    @Override
+    public void stop() throws Exception {
+        // left blank
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerProcess.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerProcess.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerProcess.java
new file mode 100644
index 0000000..6847ac5
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerProcess.java
@@ -0,0 +1,93 @@
+package org.apache.helix.autoscale.impl.shell;
+
+import java.io.File;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.autoscale.container.ContainerProcess;
+import org.apache.helix.autoscale.container.ContainerProcessProperties;
+import org.apache.helix.autoscale.container.ContainerUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Host process for Shell-based container. ContainerProcess configuration is
+ * read from path in first command-line argument. Status is maintained using
+ * temporary marker file. (Program entry point)
+ * 
+ */
+class ShellContainerProcess {
+    static final Logger             log              = Logger.getLogger(ShellContainerProcess.class);
+
+    public static final long        MONITOR_INTERVAL = 5000;
+
+    static String                   markerDir;
+    static ContainerProcess         process;
+    static ScheduledExecutorService executor         = Executors.newSingleThreadScheduledExecutor();
+
+    public static void main(String[] args) throws Exception {
+        final String propertiesPath = args[0];
+        markerDir = args[1];
+
+        ContainerProcessProperties properties = ContainerUtils.getPropertiesFromPath(propertiesPath);
+
+        process = ContainerUtils.createProcess(properties);
+
+        log.debug("Installing shutdown hooks");
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                log.debug("Running shutdown hook");
+                try {
+                    ShellContainerProcess.stop();
+                } catch (Exception ignore) {
+                }
+            }
+        });
+
+        log.debug("Launching shell container process");
+        process.start();
+
+        ShellUtils.createMarker(new File(markerDir));
+
+        log.debug("Launching process monitor");
+        executor.scheduleAtFixedRate(new ProcessMonitor(), 0, MONITOR_INTERVAL, TimeUnit.MILLISECONDS);
+    }
+
+    static void stop() throws InterruptedException {
+        log.debug("Shutting down shell process");
+        if (process != null) {
+            process.stop();
+            ShellUtils.destroyMarker(new File(markerDir));
+        }
+        if (executor != null) {
+            executor.shutdownNow();
+            while (!executor.isTerminated()) {
+                Thread.sleep(100);
+            }
+            executor = null;
+        }
+    }
+
+    static class ProcessMonitor implements Runnable {
+        @Override
+        public void run() {
+            if (process.isFailed()) {
+                log.warn("detected process failure");
+                try {
+                    ShellContainerProcess.stop();
+                } catch (Exception ignore) {
+                }
+                System.exit(1);
+            }
+            if (!process.isActive()) {
+                log.warn("detected process shutdown");
+                try {
+                    ShellContainerProcess.stop();
+                } catch (Exception ignore) {
+                }
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerProvider.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerProvider.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerProvider.java
new file mode 100644
index 0000000..df4c6ef
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerProvider.java
@@ -0,0 +1,151 @@
+package org.apache.helix.autoscale.impl.shell;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.autoscale.ContainerProvider;
+import org.apache.helix.autoscale.ContainerProviderService;
+import org.apache.helix.autoscale.container.ContainerProcessProperties;
+import org.apache.helix.autoscale.impl.shell.ShellContainerSingleton.ShellProcess;
+import org.apache.helix.autoscale.provider.ProviderProperties;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.io.Files;
+
+/**
+ * {@link ContainerProvider} spawning shell-based containers. Only works in single-VM
+ * deployments as container metadata is managed via singleton.
+ * 
+ * @see ShellContainerSingleton
+ */
+class ShellContainerProvider implements ContainerProviderService {
+
+    static final Logger                    log               = Logger.getLogger(ShellContainerProvider.class);
+
+    static final String                    RUN_COMMAND       = "/bin/sh";
+
+    static final long                      POLL_INTERVAL     = 1000;
+    static final long                      CONTAINER_TIMEOUT = 60000;
+
+    // global view of processes required
+    static final Map<String, ShellProcess> processes         = new HashMap<String, ShellProcess>();
+
+    final Map<String, Properties>          types             = new HashMap<String, Properties>();
+
+    String                                 address;
+    String                                 cluster;
+    String                                 name;
+
+    @Override
+    public void configure(Properties properties) throws Exception {
+        Preconditions.checkNotNull(properties);
+        ProviderProperties providerProperties = new ProviderProperties();
+        providerProperties.putAll(properties);
+        Preconditions.checkArgument(providerProperties.isValid());
+
+        this.address = providerProperties.getProperty("address");
+        this.cluster = providerProperties.getProperty("cluster");
+        this.name = providerProperties.getProperty("name");
+
+        for (String containerType : providerProperties.getContainers()) {
+            registerType(containerType, providerProperties.getContainer(containerType));
+        }
+    }
+
+    @Override
+    public void start() throws Exception {
+        // left blank
+    }
+
+    @Override
+    public void stop() throws Exception {
+        destroyAll();
+    }
+
+    @Override
+    public void create(String id, String type) throws Exception {
+        Map<String, ShellProcess> processes = ShellContainerSingleton.getProcesses();
+
+        synchronized (processes) {
+            Preconditions.checkState(!processes.containsKey(id), "Process '%s' already exists", id);
+            Preconditions.checkState(types.containsKey(type), "Type '%s' is not registered", type);
+
+            ContainerProcessProperties properties = new ContainerProcessProperties(types.get(type));
+
+            properties.setProperty(ContainerProcessProperties.CLUSTER, cluster);
+            properties.setProperty(ContainerProcessProperties.NAME, id);
+            properties.setProperty(ContainerProcessProperties.ADDRESS, address);
+
+            File tmpDir = Files.createTempDir();
+            File tmpProperties = new File(tmpDir.getCanonicalPath() + File.separator + ShellUtils.SHELL_CONTAINER_PROPERTIES);
+            File tmpMarker = new File(tmpDir.getCanonicalPath());
+
+            properties.store(new FileWriter(tmpProperties), id);
+
+            log.info(String.format("Running container '%s' (properties='%s')", id, properties));
+
+            log.debug(String.format("Invoking command '%s %s %s %s'", RUN_COMMAND, ShellUtils.SHELL_CONTAINER_PATH, tmpProperties.getCanonicalPath(),
+                    tmpMarker.getCanonicalPath()));
+
+            ProcessBuilder builder = new ProcessBuilder();
+            builder.command(RUN_COMMAND, ShellUtils.SHELL_CONTAINER_PATH, tmpProperties.getCanonicalPath(), tmpMarker.getCanonicalPath());
+
+            Process process = builder.start();
+
+            processes.put(id, new ShellProcess(id, name, process, tmpDir));
+
+            long limit = System.currentTimeMillis() + CONTAINER_TIMEOUT;
+            while (!ShellUtils.hasMarker(tmpDir)) {
+                if (System.currentTimeMillis() >= limit) {
+                    throw new TimeoutException(String.format("Container '%s' failed to reach active state", id));
+                }
+                Thread.sleep(POLL_INTERVAL);
+            }
+        }
+    }
+
+    @Override
+    public void destroy(String id) throws Exception {
+        Map<String, ShellProcess> processes = ShellContainerSingleton.getProcesses();
+
+        synchronized (processes) {
+            if (!processes.containsKey(id))
+                throw new IllegalArgumentException(String.format("Process '%s' does not exists", id));
+
+            log.info(String.format("Destroying container '%s'", id));
+
+            ShellProcess shell = processes.remove(id);
+            shell.process.destroy();
+            shell.process.waitFor();
+
+            FileUtils.deleteDirectory(shell.tmpDir);
+        }
+    }
+
+    @Override
+    public void destroyAll() {
+        Map<String, ShellProcess> processes = ShellContainerSingleton.getProcesses();
+
+        synchronized (processes) {
+            log.info("Destroying all owned processes");
+            for (ShellProcess process : new HashSet<ShellProcess>(processes.values())) {
+                if (process.owner.equals(name)) {
+			        try { destroy(process.id); } catch (Exception ignore) {}
+                }
+            }
+        }
+    }
+
+    void registerType(String name, Properties properties) {
+        log.debug(String.format("Registering container type '%s' (properties='%s')", name, properties));
+        types.put(name, properties);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerProviderProcess.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerProviderProcess.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerProviderProcess.java
new file mode 100644
index 0000000..1148b4e
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerProviderProcess.java
@@ -0,0 +1,45 @@
+package org.apache.helix.autoscale.impl.shell;
+
+import java.util.Properties;
+
+import org.apache.helix.autoscale.Service;
+import org.apache.helix.autoscale.provider.ProviderProcess;
+import org.apache.helix.autoscale.provider.ProviderProperties;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Configurable and runnable service for {@link ShellContainerProvider}.
+ * 
+ */
+public class ShellContainerProviderProcess implements Service {
+    ShellContainerProvider provider;
+    ProviderProcess        process;
+
+    @Override
+    public void configure(Properties properties) throws Exception {
+        ProviderProperties providerProperties = new ProviderProperties();
+        providerProperties.putAll(properties);
+
+        Preconditions.checkArgument(providerProperties.isValid(), "provider properties not valid (properties='%s')", properties);
+
+        provider = new ShellContainerProvider();
+        provider.configure(properties);
+
+        process = new ProviderProcess();
+        process.configure(providerProperties);
+        process.setConteinerProvider(provider);
+    }
+
+    @Override
+    public void start() throws Exception {
+        provider.start();
+        process.start();
+    }
+
+    @Override
+    public void stop() throws Exception {
+        process.stop();
+        provider.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerSingleton.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerSingleton.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerSingleton.java
new file mode 100644
index 0000000..a82baea
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellContainerSingleton.java
@@ -0,0 +1,58 @@
+package org.apache.helix.autoscale.impl.shell;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Singleton tracking metadata for shell-based containers spawned via
+ * {@link ShellContainerProvider}.
+ * 
+ */
+public class ShellContainerSingleton {
+    static final Map<String, ShellProcess> processes = new HashMap<String, ShellProcess>();
+
+    private ShellContainerSingleton() {
+        // left blank
+    }
+
+    public static Map<String, ShellProcess> getProcesses() {
+        return processes;
+    }
+
+    public static void reset() {
+        synchronized (processes) {
+            for (ShellProcess shell : processes.values()) {
+                shell.process.destroy();
+				try { shell.process.waitFor(); } catch(Exception ignore) {}
+            }
+            processes.clear();
+        }
+    }
+
+    public static void killProcess(String id) throws InterruptedException {
+        synchronized (processes) {
+            Preconditions.checkArgument(processes.containsKey(id), "Process '%s' does not exist", id);
+            Process process = processes.get(id).process;
+            process.destroy();
+            process.waitFor();
+            processes.remove(id);
+        }
+    }
+
+    static class ShellProcess {
+        final String  id;
+        final String  owner;
+        final Process process;
+        final File    tmpDir;
+
+        public ShellProcess(String id, String owner, Process process, File tmpDir) {
+            this.id = id;
+            this.owner = owner;
+            this.process = process;
+            this.tmpDir = tmpDir;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellStatusProvider.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellStatusProvider.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellStatusProvider.java
new file mode 100644
index 0000000..8094050
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellStatusProvider.java
@@ -0,0 +1,64 @@
+package org.apache.helix.autoscale.impl.shell;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.helix.autoscale.StatusProviderService;
+import org.apache.helix.autoscale.impl.shell.ShellContainerSingleton.ShellProcess;
+
+/**
+ * StatusProvider for shell-based containers spawned via
+ * {@link ShellContainerProvider}. Runnable and configurable service.
+ * 
+ */
+public class ShellStatusProvider implements StatusProviderService {
+
+    @Override
+    public boolean exists(String id) {
+        Map<String, ShellProcess> processes = ShellContainerSingleton.getProcesses();
+
+        synchronized (processes) {
+            return processes.containsKey(id);
+        }
+    }
+
+    @Override
+    public boolean isHealthy(String id) {
+        Map<String, ShellProcess> processes = ShellContainerSingleton.getProcesses();
+
+        synchronized (processes) {
+            ShellProcess shell = processes.get(id);
+
+            if (shell == null)
+                return false;
+
+            if (!ShellUtils.hasMarker(shell.tmpDir))
+                return false;
+
+            try {
+                // exit value
+                shell.process.exitValue();
+                return false;
+            } catch (IllegalThreadStateException e) {
+                // expected
+            }
+
+            return true;
+        }
+    }
+
+    @Override
+    public void configure(Properties properties) throws Exception {
+        // left blank
+    }
+
+    @Override
+    public void start() throws Exception {
+        // left blank
+    }
+
+    @Override
+    public void stop() throws Exception {
+        // left blank
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellUtils.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellUtils.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellUtils.java
new file mode 100644
index 0000000..02df0e0
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/shell/ShellUtils.java
@@ -0,0 +1,54 @@
+package org.apache.helix.autoscale.impl.shell;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Utility for creating and destroying temporary marker files for shell-based
+ * containers.
+ * 
+ */
+class ShellUtils {
+
+    static final Logger log                        = Logger.getLogger(ShellUtils.class);
+
+    static final String SHELL_CONTAINER_PATH       = "target/metamanager-pkg/bin/shell-container-process.sh";
+    static final String SHELL_CONTAINER_PROPERTIES = "container.properties";
+    static final String SHELL_CONTAINER_MARKER     = "active";
+
+    private ShellUtils() {
+        // left blank
+    }
+
+    public static boolean hasMarker(File processDir) {
+        try {
+            log.debug(String.format("checking for marker file '%s'", getMarkerFile(processDir)));
+            if (getMarkerFile(processDir).exists())
+                return true;
+        } catch (IOException e) {
+            // ignore
+        }
+        return false;
+    }
+
+    public static void createMarker(File processDir) throws IOException {
+        log.debug(String.format("creating marker file '%s'", getMarkerFile(processDir)));
+        getMarkerFile(processDir).createNewFile();
+    }
+
+    public static void destroyMarker(File processDir) {
+        try {
+            log.debug(String.format("destroying marker file '%s'", getMarkerFile(processDir)));
+            getMarkerFile(processDir).delete();
+        } catch (IOException e) {
+            // ignore
+        }
+    }
+
+    public static File getMarkerFile(File processDir) throws IOException {
+        return new File(processDir.getCanonicalPath() + File.separatorChar + SHELL_CONTAINER_MARKER);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerData.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerData.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerData.java
new file mode 100644
index 0000000..4ebfb5d
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerData.java
@@ -0,0 +1,86 @@
+package org.apache.helix.autoscale.impl.yarn;
+
+/**
+ * Container meta data for YARN-based containers. Reflect lifecycle of container
+ * from requesting, to bootstrapping, active operation and shutdown. Read and
+ * written by {@link YarnMasterProcess}, {@link YarnContainerProvider} and
+ * {@link YarnContainerService}. Also read by {@link YarnStatusProvider}.
+ * Typically stored in zookeeper
+ * 
+ */
+class YarnContainerData {
+
+	static enum ContainerState {
+		ACQUIRE,
+		CONNECTING,
+		ACTIVE,
+		TEARDOWN,
+		FAILED,
+		HALTED,
+		FINALIZE
+	}
+	
+    String                         id;
+    ContainerState                 state;
+    int                            yarnId;
+    String                         owner;
+    YarnContainerProcessProperties properties;
+
+    public YarnContainerData() {
+        // left blank
+    }
+
+    public YarnContainerData(String id, String owner, YarnContainerProcessProperties properties) {
+        this.id = id;
+        this.state = ContainerState.ACQUIRE;
+        this.yarnId = -1;
+        this.owner = owner;
+        this.properties = properties;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public YarnContainerData setId(String id) {
+        this.id = id;
+        return this;
+    }
+
+    public ContainerState getState() {
+        return state;
+    }
+
+    public YarnContainerData setState(ContainerState state) {
+        this.state = state;
+        return this;
+    }
+
+    public int getYarnId() {
+        return yarnId;
+    }
+
+    public YarnContainerData setYarnId(int yarnId) {
+        this.yarnId = yarnId;
+        return this;
+    }
+
+    public String getOwner() {
+        return owner;
+    }
+
+    public YarnContainerData setOwner(String owner) {
+        this.owner = owner;
+        return this;
+    }
+
+    public YarnContainerProcessProperties getProperties() {
+        return properties;
+    }
+
+    public YarnContainerData setProperties(YarnContainerProcessProperties properties) {
+        this.properties = properties;
+        return this;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProcess.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProcess.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProcess.java
new file mode 100644
index 0000000..5f8d006
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProcess.java
@@ -0,0 +1,53 @@
+package org.apache.helix.autoscale.impl.yarn;
+
+import org.apache.helix.autoscale.container.ContainerProcess;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Host process for {@link ContainerProcess}es spawned by
+ * {@link YarnContainerProvider}. Configured via *.properties file in working
+ * directory. Corresponds to regular container in YARN and is managed jointly by
+ * the application master and the Helix participant. (Program entry point)
+ * 
+ */
+class YarnContainerProcess {
+    static final Logger log = Logger.getLogger(YarnContainerProcess.class);
+
+    public static void main(String[] args) throws Exception {
+        log.trace("BEGIN YarnProcess.main()");
+
+        final YarnContainerProcessProperties properties = YarnUtils.createContainerProcessProperties(YarnUtils
+                .getPropertiesFromPath(YarnUtils.YARN_CONTAINER_PROPERTIES));
+        Preconditions.checkArgument(properties.isValid(), "container properties not valid: %s", properties.toString());
+
+        log.debug("Launching yarndata service");
+        final ZookeeperYarnDataProvider metaService = new ZookeeperYarnDataProvider(properties.getYarnData());
+        metaService.start();
+
+        log.debug("Launching yarn container service");
+        final YarnContainerService yarnService = new YarnContainerService();
+        yarnService.configure(properties);
+        yarnService.setYarnDataProvider(metaService);
+        yarnService.start();
+
+        log.debug("Installing shutdown hooks");
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                log.debug("Running shutdown hook");
+                yarnService.stop();
+                metaService.stop();
+            }
+        }));
+
+        System.out.println("Press ENTER to stop container process");
+        System.in.read();
+
+        log.debug("Stopping container services");
+        System.exit(0);
+
+        log.trace("END YarnProcess.main()");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProcessProperties.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProcessProperties.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProcessProperties.java
new file mode 100644
index 0000000..5ad8f63
--- /dev/null
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProcessProperties.java
@@ -0,0 +1,40 @@
+package org.apache.helix.autoscale.impl.yarn;
+
+import org.apache.helix.autoscale.container.ContainerProcessProperties;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Base configuration for {@link YarnContainerProcess}. 
+ *
+ */
+public class YarnContainerProcessProperties extends ContainerProcessProperties {
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = -2209509977839674160L;
+	
+	public final static String YARNDATA = "yarndata";
+	
+	public boolean isValid() {
+		return super.isValid() &&
+		       containsKey(YARNDATA);
+	}
+	
+	public String getYarnData() {
+		return getProperty(YARNDATA);
+	}
+
+    @Override
+    public Object get(Object key) {
+        Preconditions.checkState(containsKey(key));
+        return super.get(key);
+    }
+    
+    @Override
+    public String getProperty(String key) {
+        Preconditions.checkState(containsKey(key));
+        return super.getProperty(key);
+    }
+    
+}