You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/09/20 20:30:16 UTC
[07/15] Adding Helix-task-framework and Yarn integration modules
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/cluster/RedisTargetProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/cluster/RedisTargetProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/cluster/RedisTargetProvider.java
new file mode 100644
index 0000000..94c617d
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/cluster/RedisTargetProvider.java
@@ -0,0 +1,329 @@
+package org.apache.helix.metamanager.cluster;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.metamanager.ClusterStatusProvider;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+public class RedisTargetProvider implements ClusterStatusProvider {
+
+ static final Logger log = Logger.getLogger(RedisTargetProvider.class);
+
+ public static final String BENCHMARK_COMMAND = "redis-benchmark";
+ public static final String BENCHMARK_TESTS = "GET,SET";
+
+ public static final String DEFAULT_RECORDS = "100000";
+ public static final String DEFAULT_CLIENTS = "20";
+ public static final String DEFAULT_REQUESTS = "100000";
+ public static final String DEFAULT_TIMEOUT = "8000";
+ public static final String DEFAULT_INTERVAL = "10000";
+
+ ZkClient zookeeper;
+
+ final String address;
+ final String root;
+
+ final int records;
+ final int clients;
+ final int requests;
+ final int timeout;
+ final int interval;
+
+ int targetTpsGet;
+ int targetTpsSet;
+ int targetCount = 1;
+
+ ScheduledExecutorService executor;
+
+ public RedisTargetProvider(Properties properties) {
+ address = properties.getProperty("address");
+ root = properties.getProperty("root");
+ targetTpsGet = Integer.valueOf(properties.getProperty("tps.get", "0"));
+ targetTpsSet = Integer.valueOf(properties.getProperty("tps.set", "0"));
+ records = Integer.valueOf(properties.getProperty("records", DEFAULT_RECORDS));
+ clients = Integer.valueOf(properties.getProperty("clients", DEFAULT_CLIENTS));
+ requests = Integer.valueOf(properties.getProperty("requests", DEFAULT_REQUESTS));
+ timeout = Integer.valueOf(properties.getProperty("timeout", DEFAULT_TIMEOUT));
+ interval = Integer.valueOf(properties.getProperty("interval", DEFAULT_INTERVAL));
+ }
+
+ public void startService() {
+ log.debug("starting redis status service");
+ zookeeper = new ZkClient(address);
+ zookeeper.createPersistent("/" + root, true);
+
+ // TODO not concurrency-safe, should not matter though
+ if (!zookeeper.exists("/" + root + "/target.get")) {
+ try {
+ zookeeper.createPersistent("/" + root + "/target.get", String.valueOf(targetTpsGet));
+ } catch (Exception ignore) {
+ }
+ }
+ if (!zookeeper.exists("/" + root + "/target.set")) {
+ try {
+ zookeeper.createPersistent("/" + root + "/target.set", String.valueOf(targetTpsSet));
+ } catch (Exception ignore) {
+ }
+ }
+
+ executor = Executors.newSingleThreadScheduledExecutor();
+ executor.scheduleAtFixedRate(new RedisBenchmarkRunnable(), 0, interval, TimeUnit.MILLISECONDS);
+ }
+
+ public void stopService() {
+ log.debug("stopping redis status service");
+ if (executor != null) {
+ executor.shutdownNow();
+ while (!executor.isTerminated()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ executor = null;
+ }
+ if (zookeeper != null) {
+ zookeeper.close();
+ zookeeper = null;
+ }
+ }
+
+ @Override
+ public int getTargetContainerCount(String containerType) throws Exception {
+ return targetCount;
+ }
+
+ private class RedisBenchmarkRunnable implements Runnable {
+ ExecutorService executor = Executors.newCachedThreadPool();
+ RedisResult aggregateResult;
+
+ @Override
+ public void run() {
+ log.debug("running redis benchmark");
+
+ aggregateResult = new RedisResult(0);
+ Collection<Future<RedisResult>> futures = new ArrayList<Future<RedisResult>>();
+
+ try {
+ Collection<RedisTarget> targets = getTargets();
+
+ // start benchmark
+ for (RedisTarget target : targets) {
+ log.debug(String.format("submitting target '%s'", target));
+ Future<RedisResult> future = executor.submit(new RedisCallable(target));
+ futures.add(future);
+ }
+
+ // aggregate results
+ try {
+ log.debug("waiting for results");
+
+ long limit = System.currentTimeMillis() + timeout;
+ for (Future<RedisResult> future : futures) {
+ try {
+ RedisResult result = future.get(limit - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ log.debug(String.format("got result '%s'", result));
+ aggregate(result);
+ } catch (Exception e) {
+ log.warn(String.format("failed to get result"));
+ future.cancel(true);
+ }
+ }
+ } catch (Exception e) {
+ log.error("Error running redis benchmark", e);
+
+ for (Future<RedisResult> future : futures) {
+ future.cancel(true);
+ }
+
+ return;
+ }
+
+ // compare to thresholds
+ log.debug(String.format("aggregate result is '%s'", aggregateResult));
+
+ // get target from zookeeper
+ try {
+ targetTpsGet = Integer.valueOf(zookeeper.<String> readData("/" + root + "/target.get"));
+ } catch (Exception ignore) {
+ }
+ try {
+ targetTpsSet = Integer.valueOf(zookeeper.<String> readData("/" + root + "/target.set"));
+ } catch (Exception ignore) {
+ }
+
+ // calculate counts
+ int targetCountGet = -1;
+ if (aggregateResult.containsKey("GET")) {
+ double tpsTarget = targetTpsGet;
+ double tps = aggregateResult.get("GET");
+
+ targetCountGet = (int) Math.ceil(tpsTarget / tps * aggregateResult.serverCount);
+ log.debug(String.format("count.get=%d, tps.get=%f, target.get=%f", targetCountGet, tps, tpsTarget));
+ }
+
+ int targetCountSet = -1;
+ if (aggregateResult.containsKey("SET")) {
+ double tpsTarget = targetTpsSet;
+ double tps = aggregateResult.get("SET");
+
+ targetCountSet = (int) Math.ceil(tpsTarget / tps * aggregateResult.serverCount);
+ log.debug(String.format("count.set=%d, tps.set=%f, target.set=%f", targetCountSet, tps, tpsTarget));
+ }
+
+ targetCount = Math.max(targetCountGet, targetCountSet);
+ targetCount = Math.max(targetCount, 1);
+
+ log.debug(String.format("target count is %d", targetCount));
+ RedisTargetProvider.this.targetCount = targetCount;
+
+ } catch (Exception e) {
+ log.error("Error running redis benchmark", e);
+
+ for (Future<RedisResult> future : futures) {
+ future.cancel(true);
+ }
+ }
+
+ }
+
+ Collection<RedisTarget> getTargets() {
+ log.debug("fetching redis servers from zookeeper");
+ Collection<RedisTarget> targets = new ArrayList<RedisTarget>();
+ Collection<String> servers = zookeeper.getChildren("/" + root);
+
+ servers.remove("target.get");
+ servers.remove("target.set");
+
+ for (String server : servers) {
+ String hostname = zookeeper.readData("/" + root + "/" + server + "/hostname");
+ int port = Integer.valueOf(zookeeper.<String> readData("/" + root + "/" + server + "/port"));
+
+ targets.add(new RedisTarget(hostname, port));
+ }
+
+ log.debug(String.format("found %d servers: %s", targets.size(), targets));
+ return targets;
+ }
+
+ void aggregate(RedisResult result) {
+ RedisResult newResult = new RedisResult(aggregateResult.serverCount + result.serverCount);
+
+ for (Entry<String, Double> entry : result.entrySet()) {
+ double current = 0.0d;
+ if (aggregateResult.containsKey(entry.getKey()))
+ current = aggregateResult.get(entry.getKey());
+
+ current += entry.getValue();
+ newResult.put(entry.getKey(), current);
+ }
+
+ aggregateResult = newResult;
+ }
+ }
+
+ private static class RedisTarget {
+ final String hostname;
+ final int port;
+
+ public RedisTarget(String hostname, int port) {
+ this.hostname = hostname;
+ this.port = port;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s:%d", hostname, port);
+ }
+ }
+
+ private static class RedisResult extends HashMap<String, Double> {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 4599748807597500952L;
+
+ final int serverCount;
+
+ public RedisResult(int serverCount) {
+ this.serverCount = serverCount;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("[serverCount=%d %s]", serverCount, super.toString());
+ }
+ }
+
+ private class RedisCallable implements Callable<RedisResult> {
+ final RedisTarget target;
+
+ public RedisCallable(RedisTarget target) {
+ this.target = target;
+ }
+
+ @Override
+ public RedisResult call() throws Exception {
+ log.debug(String.format("executing benchmark for '%s'", target));
+
+ ProcessBuilder builder = new ProcessBuilder();
+ builder.command(BENCHMARK_COMMAND, "-h", target.hostname, "-p", String.valueOf(target.port), "-r", String.valueOf(records), "-n",
+ String.valueOf(requests), "-c", String.valueOf(clients), "-t", BENCHMARK_TESTS, "--csv");
+ Process process = builder.start();
+
+ log.debug(String.format("running '%s'", builder.command()));
+
+ RedisResult result = new RedisResult(1);
+
+ int retVal;
+ try {
+ retVal = process.waitFor();
+ } catch (InterruptedException e) {
+ process.destroy();
+ return result;
+ }
+
+ Preconditions.checkState(retVal == 0, "Benchmark process returned %s", retVal);
+
+ Pattern pattern = Pattern.compile("\"([A-Z0-9_]+).*\",\"([0-9\\.]+)\"");
+
+ log.debug("parsing output");
+ BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ Matcher matcher = pattern.matcher(line);
+
+ if (!matcher.find())
+ continue;
+
+ String key = matcher.group(1);
+ Double value = Double.valueOf(matcher.group(2));
+
+ result.put(key, value);
+ }
+
+ log.debug(String.format("benchmark for '%s' returned '%s'", target, result));
+
+ return result;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/cluster/StaticTargetProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/cluster/StaticTargetProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/cluster/StaticTargetProvider.java
new file mode 100644
index 0000000..47bf725
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/cluster/StaticTargetProvider.java
@@ -0,0 +1,41 @@
+package org.apache.helix.metamanager.cluster;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.helix.metamanager.ClusterStatusProvider;
+
+
+public class StaticTargetProvider implements ClusterStatusProvider {
+
+ final Map<String, Integer> targetCounts = new HashMap<String, Integer>();
+
+ public StaticTargetProvider() {
+ // left blank
+ }
+
+ public StaticTargetProvider(Properties properties) {
+ for(Entry<Object, Object> entry : properties.entrySet()) {
+ String key = (String)entry.getKey();
+ int value = Integer.valueOf((String)entry.getValue());
+
+ targetCounts.put(key, value);
+ }
+ }
+
+ public StaticTargetProvider(Map<String, Integer> targetCounts) {
+ this.targetCounts.putAll(targetCounts);
+ }
+
+ @Override
+ public int getTargetContainerCount(String containerType) {
+ return targetCounts.get(containerType);
+ }
+
+ public void setTargetContainerCount(String containerType, int targetCount) {
+ targetCounts.put(containerType, targetCount);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerProcess.java
new file mode 100644
index 0000000..11ad86e
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerProcess.java
@@ -0,0 +1,133 @@
+package org.apache.helix.metamanager.container;
+
+import java.util.Properties;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.metamanager.Service;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Base service for spawn-able container types. Configure from Properties and
+ * instantiates Helix participant to managed cluster.
+ *
+ */
+public abstract class ContainerProcess implements Service {
+ static final Logger log = Logger.getLogger(ContainerProcess.class);
+
+ private ContainerProcessProperties properties;
+ private HelixManager participantManager;
+
+ private String modelName;
+ private StateModelFactory<? extends StateModel> modelFactory;
+
+ private String instanceName;
+ private String clusterName;
+ private String zookeeperAddress;
+
+ private boolean active = false;
+ private boolean failed = false;
+
+ public final void setModelName(String modelName) {
+ this.modelName = modelName;
+ }
+
+ public final void setModelFactory(StateModelFactory<? extends StateModel> modelFactory) {
+ this.modelFactory = modelFactory;
+ }
+
+ @Override
+ public void configure(Properties properties) throws Exception {
+ ContainerProcessProperties containerProps = new ContainerProcessProperties();
+ containerProps.putAll(properties);
+ Preconditions.checkArgument(containerProps.isValid());
+
+ this.properties = containerProps;
+ this.instanceName = containerProps.getName();
+ this.clusterName = containerProps.getCluster();
+ this.zookeeperAddress = containerProps.getAddress();
+ }
+
+ @Override
+ public final void start() {
+ try {
+ Preconditions.checkNotNull(modelName, "state model name not set");
+ Preconditions.checkNotNull(modelFactory, "state model factory not set");
+ Preconditions.checkState(properties.isValid(), "process properties not valid: %s", properties.toString());
+
+ log.info(String.format("starting container '%s'", instanceName));
+ startContainer();
+
+ log.info(String.format("starting helix participant '%s'", instanceName));
+ startParticipant();
+
+ active = true;
+
+ } catch (Exception e) {
+ log.error(String.format("starting container '%s' failed", instanceName), e);
+ fail();
+ }
+ }
+
+ protected abstract void startContainer() throws Exception;
+
+ private final void startParticipant() throws Exception {
+ participantManager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, zookeeperAddress);
+ participantManager.getStateMachineEngine().registerStateModelFactory(modelName, modelFactory);
+ participantManager.connect();
+ }
+
+ @Override
+ public final void stop() {
+ try {
+ log.info(String.format("stopping helix participant '%s'", instanceName));
+ stopParticipant();
+
+ log.info(String.format("stopping container '%s'", instanceName));
+ stopContainer();
+
+ active = false;
+
+ } catch (Exception e) {
+ log.warn(String.format("stopping container '%s' failed", instanceName), e);
+ }
+ }
+
+ protected abstract void stopContainer() throws Exception;
+
+ private final void stopParticipant() {
+ if (participantManager != null) {
+ participantManager.disconnect();
+ }
+ }
+
+ public final void fail() {
+ failed = true;
+ }
+
+ public final boolean isActive() {
+ return active && !failed;
+ }
+
+ public final boolean isFailed() {
+ return failed;
+ }
+
+ public final ContainerProcessProperties getProperties() {
+ return properties;
+ }
+
+ String getModelName() {
+ return modelName;
+ }
+
+ StateModelFactory<? extends StateModel> getModelFactory() {
+ return modelFactory;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerProcessProperties.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerProcessProperties.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerProcessProperties.java
new file mode 100644
index 0000000..1a6d272
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerProcessProperties.java
@@ -0,0 +1,66 @@
+package org.apache.helix.metamanager.container;
+
+import java.util.Properties;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Base configuration for ContainerProcess.
+ *
+ */
+public class ContainerProcessProperties extends Properties {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 5754863079470995536L;
+
+ public static final String CLUSTER = "cluster";
+ public static final String ADDRESS = "address";
+ public static final String NAME = "name";
+ public static final String CONTAINER_CLASS = "class";
+
+ public ContainerProcessProperties() {
+ // left blank
+ }
+
+ public ContainerProcessProperties(Properties properties) {
+ Preconditions.checkNotNull(properties);
+ putAll(properties);
+ }
+
+ public boolean isValid() {
+ return containsKey(CLUSTER) &&
+ containsKey(NAME) &&
+ containsKey(ADDRESS) &&
+ containsKey(CONTAINER_CLASS);
+ }
+
+ public String getCluster() {
+ return getProperty(CLUSTER);
+ }
+
+ public String getAddress() {
+ return getProperty(ADDRESS);
+ }
+
+ public String getName() {
+ return getProperty(NAME);
+ }
+
+ public String getContainerClass() {
+ return getProperty(CONTAINER_CLASS);
+ }
+
+ @Override
+ public synchronized Object get(Object key) {
+ Preconditions.checkState(containsKey(key));
+ return super.get(key);
+ }
+
+ @Override
+ public String getProperty(String key) {
+ Preconditions.checkState(containsKey(key));
+ return super.getProperty(key);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerStateModel.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerStateModel.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerStateModel.java
new file mode 100644
index 0000000..9ac6b5c
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerStateModel.java
@@ -0,0 +1,64 @@
+package org.apache.helix.metamanager.container;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+@StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "SLAVE", "MASTER", "DROPPED" })
+public class ContainerStateModel extends StateModel {
+
+ static final Logger log = Logger.getLogger(ContainerStateModel.class);
+
+ @Transition(from = "OFFLINE", to = "SLAVE")
+ public void offlineToSlave(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from OFFLINE to SLAVE",
+ context.getManager().getInstanceName()));
+ }
+
+ @Transition(from = "SLAVE", to = "OFFLINE")
+ public void slaveToOffline(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from SLAVE to OFFLINE",
+ context.getManager().getInstanceName()));
+ }
+
+ @Transition(from = "SLAVE", to = "MASTER")
+ public void slaveToMaster(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from SLAVE to MASTER",
+ context.getManager().getInstanceName()));
+ }
+
+ @Transition(from = "MASTER", to = "SLAVE")
+ public void masterToSlave(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from MASTER to SLAVE",
+ context.getManager().getInstanceName()));
+ }
+
+ @Transition(from = "OFFLINE", to = "DROPPED")
+ public void offlineToDropped(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from OFFLINE to DROPPED",
+ context.getManager().getInstanceName()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerStateModelFactory.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerStateModelFactory.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerStateModelFactory.java
new file mode 100644
index 0000000..ab5a099
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerStateModelFactory.java
@@ -0,0 +1,30 @@
+package org.apache.helix.metamanager.container;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+public class ContainerStateModelFactory extends StateModelFactory<ContainerStateModel> {
+
+ @Override
+ public ContainerStateModel createNewStateModel(String partitionName) {
+ return new ContainerStateModel();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerUtils.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerUtils.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerUtils.java
new file mode 100644
index 0000000..3d32862
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/ContainerUtils.java
@@ -0,0 +1,46 @@
+package org.apache.helix.metamanager.container;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.reflect.Constructor;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Utility for loading ContainerProperties and spawning ContainerProcess.
+ *
+ */
+public class ContainerUtils {
+
+ static final Logger log = Logger.getLogger(ContainerUtils.class);
+
+ private ContainerUtils() {
+ // left blank
+ }
+
+ public static ContainerProcess createProcess(ContainerProcessProperties properties) throws Exception {
+ String containerClassName = properties.getContainerClass();
+
+ Class<?> containerClass = Class.forName(containerClassName);
+
+ log.debug(String.format("checking for properties constructor in class '%s'", containerClassName));
+
+ Constructor<?> constructor = containerClass.getConstructor(ContainerProcessProperties.class);
+
+ return (ContainerProcess) constructor.newInstance(properties);
+ }
+
+ public static ContainerProcessProperties getPropertiesFromResource(String resourceName) throws IOException {
+ ContainerProcessProperties properties = new ContainerProcessProperties();
+ properties.load(ClassLoader.getSystemResourceAsStream(resourceName));
+ return properties;
+ }
+
+ public static ContainerProcessProperties getPropertiesFromPath(String filePath) throws IOException {
+ ContainerProcessProperties properties = new ContainerProcessProperties();
+ properties.load(new InputStreamReader(new FileInputStream(filePath)));
+ return properties;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/DummyMasterSlaveProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/DummyMasterSlaveProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/DummyMasterSlaveProcess.java
new file mode 100644
index 0000000..d91d77c
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/DummyMasterSlaveProcess.java
@@ -0,0 +1,76 @@
+package org.apache.helix.metamanager.container.impl;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.metamanager.container.ContainerProcess;
+import org.apache.helix.metamanager.container.ContainerProcessProperties;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+public class DummyMasterSlaveProcess extends ContainerProcess {
+
+ static final Logger log = Logger.getLogger(DummyMasterSlaveProcess.class);
+
+ public DummyMasterSlaveProcess(ContainerProcessProperties properties) {
+ super(properties);
+ setModelName("MasterSlave");
+ setModelFactory(new DummyMasterSlaveModelFactory());
+ }
+
+ @Override
+ protected void startContainer() throws Exception {
+ log.info("starting dummy process container");
+ }
+
+ @Override
+ protected void stopContainer() throws Exception {
+ log.info("stopping dummy process container");
+ }
+
+ public static class DummyMasterSlaveModelFactory extends StateModelFactory<DummyMasterSlaveStateModel> {
+ @Override
+ public DummyMasterSlaveStateModel createNewStateModel(String partitionName) {
+ return new DummyMasterSlaveStateModel();
+ }
+ }
+
+ @StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "SLAVE", "MASTER", "DROPPED" })
+ public static class DummyMasterSlaveStateModel extends StateModel {
+
+ static final Logger log = Logger.getLogger(DummyMasterSlaveStateModel.class);
+
+ @Transition(from = "OFFLINE", to = "SLAVE")
+ public void offlineToSlave(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from OFFLINE to SLAVE",
+ context.getManager().getInstanceName()));
+ }
+
+ @Transition(from = "SLAVE", to = "OFFLINE")
+ public void slaveToOffline(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from SLAVE to OFFLINE",
+ context.getManager().getInstanceName()));
+ }
+
+ @Transition(from = "SLAVE", to = "MASTER")
+ public void slaveToMaster(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from SLAVE to MASTER",
+ context.getManager().getInstanceName()));
+ }
+
+ @Transition(from = "MASTER", to = "SLAVE")
+ public void masterToSlave(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from MASTER to SLAVE",
+ context.getManager().getInstanceName()));
+ }
+
+ @Transition(from = "OFFLINE", to = "DROPPED")
+ public void offlineToDropped(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from OFFLINE to DROPPED",
+ context.getManager().getInstanceName()));
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/DummyOnlineOfflineProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/DummyOnlineOfflineProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/DummyOnlineOfflineProcess.java
new file mode 100644
index 0000000..d5015f4
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/DummyOnlineOfflineProcess.java
@@ -0,0 +1,64 @@
+package org.apache.helix.metamanager.container.impl;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.metamanager.container.ContainerProcess;
+import org.apache.helix.metamanager.container.ContainerProcessProperties;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+public class DummyOnlineOfflineProcess extends ContainerProcess {
+
+ static final Logger log = Logger.getLogger(DummyOnlineOfflineProcess.class);
+
+ public DummyOnlineOfflineProcess(ContainerProcessProperties properties) {
+ super(properties);
+ setModelName("OnlineOffline");
+ setModelFactory(new DummyOnlineOfflineModelFactory());
+ }
+
+ @Override
+ protected void startContainer() throws Exception {
+ log.info("starting dummy online-offline process container");
+ }
+
+ @Override
+ protected void stopContainer() throws Exception {
+ log.info("stopping dummy online-offline process container");
+ }
+
+ public static class DummyOnlineOfflineModelFactory extends StateModelFactory<DummyOnlineOfflineStateModel> {
+ @Override
+ public DummyOnlineOfflineStateModel createNewStateModel(String partitionName) {
+ return new DummyOnlineOfflineStateModel();
+ }
+ }
+
+ @StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "ONLINE", "DROPPED" })
+ public static class DummyOnlineOfflineStateModel extends StateModel {
+
+ static final Logger log = Logger.getLogger(DummyOnlineOfflineStateModel.class);
+
+ @Transition(from = "OFFLINE", to = "ONLINE")
+ public void offlineToOnline(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from OFFLINE to ONLINE",
+ context.getManager().getInstanceName()));
+ }
+
+ @Transition(from = "ONLINE", to = "OFFLINE")
+ public void onlineToOffline(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from ONLINE to OFFLINE",
+ context.getManager().getInstanceName()));
+ }
+
+ @Transition(from = "OFFLINE", to = "DROPPED")
+ public void offlineToDropped(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from OFFLINE to DROPPED",
+ context.getManager().getInstanceName()));
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/DummyProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/DummyProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/DummyProcess.java
new file mode 100644
index 0000000..b4963a7
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/DummyProcess.java
@@ -0,0 +1,76 @@
+package org.apache.helix.metamanager.container.impl;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.metamanager.container.ContainerProcess;
+import org.apache.helix.metamanager.container.ContainerProcessProperties;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+public class DummyProcess extends ContainerProcess {
+
+ static final Logger log = Logger.getLogger(DummyProcess.class);
+
+ public DummyProcess(ContainerProcessProperties properties) {
+ super(properties);
+ setModelName("MasterSlave");
+ setModelFactory(new DummyModelFactory());
+ }
+
+ @Override
+ protected void startContainer() throws Exception {
+ log.info("starting dummy process container");
+ }
+
+ @Override
+ protected void stopContainer() throws Exception {
+ log.info("stopping dummy process container");
+ }
+
+ public static class DummyModelFactory extends StateModelFactory<DummyStateModel> {
+ @Override
+ public DummyStateModel createNewStateModel(String partitionName) {
+ return new DummyStateModel();
+ }
+ }
+
+ @StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "SLAVE", "MASTER", "DROPPED" })
+ public static class DummyStateModel extends StateModel {
+
+ static final Logger log = Logger.getLogger(DummyStateModel.class);
+
+ @Transition(from = "OFFLINE", to = "SLAVE")
+ public void offlineToSlave(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from OFFLINE to SLAVE",
+ context.getManager().getInstanceName()));
+ }
+
+ @Transition(from = "SLAVE", to = "OFFLINE")
+ public void slaveToOffline(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from SLAVE to OFFLINE",
+ context.getManager().getInstanceName()));
+ }
+
+ @Transition(from = "SLAVE", to = "MASTER")
+ public void slaveToMaster(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from SLAVE to MASTER",
+ context.getManager().getInstanceName()));
+ }
+
+ @Transition(from = "MASTER", to = "SLAVE")
+ public void masterToSlave(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from MASTER to SLAVE",
+ context.getManager().getInstanceName()));
+ }
+
+ @Transition(from = "OFFLINE", to = "DROPPED")
+ public void offlineToDropped(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from OFFLINE to DROPPED",
+ context.getManager().getInstanceName()));
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/RedisServerProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/RedisServerProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/RedisServerProcess.java
new file mode 100644
index 0000000..d084a71
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/RedisServerProcess.java
@@ -0,0 +1,135 @@
+package org.apache.helix.metamanager.container.impl;
+
+import java.net.InetAddress;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.metamanager.container.ContainerProcess;
+import org.apache.helix.metamanager.container.ContainerProcessProperties;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+public class RedisServerProcess extends ContainerProcess {
+
+ static final Logger log = Logger.getLogger(RedisServerProcess.class);
+
+ public static final String REDIS_SERVER_COMMAND = "redis-server";
+
+ public static final long MONITOR_INTERVAL = 5000;
+
+ ZkClient zookeeper;
+
+ final String address;
+ final String root;
+ final String name;
+ final int basePort;
+
+ Process process;
+
+ ScheduledExecutorService executor;
+
+ public RedisServerProcess(ContainerProcessProperties properties) {
+ super(properties);
+
+ setModelName("OnlineOffline");
+ setModelFactory(new RedisServerModelFactory());
+
+ address = properties.getProperty("address");
+ root = properties.getProperty("root");
+ basePort = Integer.valueOf(properties.getProperty("baseport"));
+ name = properties.getProperty(ContainerProcessProperties.HELIX_INSTANCE);
+ }
+
+ @Override
+ protected void startContainer() throws Exception {
+ log.info(String.format("starting redis server container for instance '%s'", name));
+
+ String hostname = InetAddress.getLocalHost().getHostName();
+ int port = basePort + Integer.valueOf(name.split("_")[1]);
+
+ log.debug(String.format("Starting redis server at '%s:%d'", hostname, port));
+
+ ProcessBuilder builder = new ProcessBuilder();
+ builder.command(REDIS_SERVER_COMMAND, "--port", String.valueOf(port));
+ process = builder.start();
+
+ log.debug("Updating zookeeper");
+ zookeeper = new ZkClient(address);
+ zookeeper.deleteRecursive("/" + root + "/" + name);
+ zookeeper.createPersistent("/" + root + "/" + name, true);
+ zookeeper.createPersistent("/" + root + "/" + name + "/hostname", hostname);
+ zookeeper.createPersistent("/" + root + "/" + name + "/port", String.valueOf(port));
+
+ log.debug("Starting process monitor");
+ executor = Executors.newSingleThreadScheduledExecutor();
+ executor.scheduleAtFixedRate(new ProcessMonitor(), 0, MONITOR_INTERVAL, TimeUnit.MILLISECONDS);
+
+ }
+
+ @Override
+ protected void stopContainer() throws Exception {
+ log.info("stopping redis server container");
+
+ log.debug("Stopping process monitor");
+ executor.shutdownNow();
+
+ log.debug("Updating zookeeper");
+ zookeeper.deleteRecursive("/" + root + "/" + name);
+ zookeeper.close();
+
+ log.debug("Stopping process");
+ process.destroy();
+ process.waitFor();
+ }
+
+ public class RedisServerModelFactory extends StateModelFactory<RedisServerModel> {
+ @Override
+ public RedisServerModel createNewStateModel(String partitionName) {
+ return new RedisServerModel();
+ }
+ }
+
+ @StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "ONLINE", "DROPPED" })
+ public class RedisServerModel extends StateModel {
+
+ @Transition(from = "OFFLINE", to = "ONLINE")
+ public void offlineToSlave(Message m, NotificationContext context) {
+ // left blank
+ log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+ }
+
+ @Transition(from = "ONLINE", to = "OFFLINE")
+ public void slaveToOffline(Message m, NotificationContext context) {
+ // left blank
+ log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+ }
+
+ @Transition(from = "OFFLINE", to = "DROPPED")
+ public void offlineToDropped(Message m, NotificationContext context) {
+ // left blank
+ log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+ }
+
+ }
+
+ private class ProcessMonitor implements Runnable {
+ @Override
+ public void run() {
+ try {
+ process.exitValue();
+ log.warn("detected process failure");
+ fail();
+ } catch (Exception e) {
+ // expected
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/ZookeeperMasterSlaveProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/ZookeeperMasterSlaveProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/ZookeeperMasterSlaveProcess.java
new file mode 100644
index 0000000..f8bbc85
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/container/impl/ZookeeperMasterSlaveProcess.java
@@ -0,0 +1,104 @@
+package org.apache.helix.metamanager.container.impl;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.metamanager.container.ContainerProcess;
+import org.apache.helix.metamanager.container.ContainerProcessProperties;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+public class ZookeeperMasterSlaveProcess extends ContainerProcess {
+
+ static final Logger log = Logger.getLogger(ZookeeperMasterSlaveProcess.class);
+
+ ZkClient zookeeper;
+
+ final String address;
+ final String root;
+ final String name;
+
+ public ZookeeperMasterSlaveProcess(ContainerProcessProperties properties) {
+ super(properties);
+
+ setModelName("MasterSlave");
+ setModelFactory(new ZookeeperMasterSlaveModelFactory());
+
+ address = properties.getProperty("address");
+ root = properties.getProperty("root");
+ name = properties.getProperty(ContainerProcessProperties.HELIX_INSTANCE);
+ }
+
+ @Override
+ protected void startContainer() throws Exception {
+ log.info("starting zookeeper process container");
+
+ zookeeper = new ZkClient(address);
+ zookeeper.createPersistent("/" + root + "/" + name, true);
+ }
+
+ @Override
+ protected void stopContainer() throws Exception {
+ log.info("stopping zookeeper process container");
+
+ zookeeper.close();
+ }
+
+ public class ZookeeperMasterSlaveModelFactory extends StateModelFactory<ZookeeperMasterSlaveModel> {
+ @Override
+ public ZookeeperMasterSlaveModel createNewStateModel(String partitionName) {
+ return new ZookeeperMasterSlaveModel();
+ }
+ }
+
+ @StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "SLAVE", "MASTER", "DROPPED" })
+ public class ZookeeperMasterSlaveModel extends StateModel {
+
+ @Transition(from = "OFFLINE", to = "SLAVE")
+ public void offlineToSlave(Message m, NotificationContext context) {
+ transition(m, context);
+ }
+
+ @Transition(from = "SLAVE", to = "OFFLINE")
+ public void slaveToOffline(Message m, NotificationContext context) {
+ transition(m, context);
+ }
+
+ @Transition(from = "SLAVE", to = "MASTER")
+ public void slaveToMaster(Message m, NotificationContext context) {
+ transition(m, context);
+ }
+
+ @Transition(from = "MASTER", to = "SLAVE")
+ public void masterToSlave(Message m, NotificationContext context) {
+ transition(m, context);
+ }
+
+ @Transition(from = "OFFLINE", to = "DROPPED")
+ public void offlineToDropped(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+
+ String resource = m.getResourceName();
+ String partition = m.getPartitionName();
+ String path = "/" + root + "/" + name + "/" + resource + "_" + partition;
+
+ zookeeper.delete(path);
+ }
+
+ public void transition(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+
+ String resource = m.getResourceName();
+ String partition = m.getPartitionName();
+ String path = "/" + root + "/" + name + "/" + resource + "_" + partition;
+
+ zookeeper.delete(path);
+ zookeeper.createEphemeral(path, m.getToState());
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/FileTargetProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/FileTargetProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/FileTargetProvider.java
new file mode 100644
index 0000000..6eac3e8
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/FileTargetProvider.java
@@ -0,0 +1,51 @@
+package org.apache.helix.metamanager.impl;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.helix.metamanager.TargetProviderService;
+
+/**
+ * File-based target model. Container count is extracted from properties file. Count may change dynamically.
+ *
+ */
+public class FileTargetProvider implements TargetProviderService {
+
+ File file;
+
+ public FileTargetProvider() {
+ // left blank
+ }
+
+ public FileTargetProvider(String path) {
+ this.file = new File(path);
+ }
+
+ @Override
+ public int getTargetContainerCount(String containerType) throws FileNotFoundException, IOException, IllegalArgumentException {
+ Properties properties = new Properties();
+ properties.load(new FileReader(file));
+ if (!properties.contains(containerType))
+ throw new IllegalArgumentException(String.format("container type '%s' not found in '%s'", containerType, file.getCanonicalPath()));
+ return Integer.parseInt((String) properties.get(containerType));
+ }
+
+ @Override
+ public void configure(Properties properties) throws Exception {
+ this.file = new File(properties.getProperty("path"));
+ }
+
+ @Override
+ public void start() throws Exception {
+ // left blank
+ }
+
+ @Override
+ public void stop() throws Exception {
+ // left blank
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/RedisTargetProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/RedisTargetProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/RedisTargetProvider.java
new file mode 100644
index 0000000..1fdf96e
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/RedisTargetProvider.java
@@ -0,0 +1,356 @@
+package org.apache.helix.metamanager.impl;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.metamanager.TargetProviderService;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Redis-specific target model based on recurring Tps benchmarking. Tps target
+ * and probed redis-server instances are configured via zookeeper. Tps target
+ * may change dynamically.
+ *
+ */
+public class RedisTargetProvider implements TargetProviderService {
+
+ static final Logger log = Logger.getLogger(RedisTargetProvider.class);
+
+ public static final String BENCHMARK_COMMAND = "redis-benchmark";
+ public static final String BENCHMARK_TESTS = "GET,SET";
+
+ public static final String DEFAULT_RECORDS = "100000";
+ public static final String DEFAULT_CLIENTS = "20";
+ public static final String DEFAULT_REQUESTS = "100000";
+ public static final String DEFAULT_TIMEOUT = "8000";
+ public static final String DEFAULT_INTERVAL = "10000";
+ public static final String DEFAULT_ALPHA = "0.25";
+
+ ZkClient zookeeper;
+
+ String address;
+ String root;
+
+ int records;
+ int clients;
+ int requests;
+ int timeout;
+ int interval;
+
+ int targetTpsGet;
+ int targetTpsSet;
+
+ int targetCountMin;
+ int targetCountMax;
+ int targetCount;
+
+ double alpha;
+ double averageTpsGet;
+ double averageTpsSet;
+ double averageCount;
+
+ ScheduledExecutorService executor;
+
+ @Override
+ public void configure(Properties properties) {
+ address = properties.getProperty("address");
+ root = properties.getProperty("root");
+ targetTpsGet = Integer.valueOf(properties.getProperty("get", "0"));
+ targetTpsSet = Integer.valueOf(properties.getProperty("set", "0"));
+ targetCountMin = Integer.valueOf(properties.getProperty("min", "-1"));
+ targetCountMax = Integer.valueOf(properties.getProperty("max", "-1"));
+ records = Integer.valueOf(properties.getProperty("records", DEFAULT_RECORDS));
+ clients = Integer.valueOf(properties.getProperty("clients", DEFAULT_CLIENTS));
+ requests = Integer.valueOf(properties.getProperty("requests", DEFAULT_REQUESTS));
+ timeout = Integer.valueOf(properties.getProperty("timeout", DEFAULT_TIMEOUT));
+ interval = Integer.valueOf(properties.getProperty("interval", DEFAULT_INTERVAL));
+ alpha = Double.valueOf(properties.getProperty("alpha", DEFAULT_ALPHA));
+ }
+
+ @Override
+ public void start() {
+ log.debug("starting redis status service");
+ zookeeper = new ZkClient(address);
+ zookeeper.createPersistent("/" + root, true);
+
+ try { zookeeper.createPersistent("/" + root + "/target.get", String.valueOf(targetTpsGet)); } catch (Exception ignore) {}
+ try { zookeeper.createPersistent("/" + root + "/target.set", String.valueOf(targetTpsSet)); } catch (Exception ignore) {}
+ try { zookeeper.createPersistent("/" + root + "/target.min", String.valueOf(targetCountMin)); } catch (Exception ignore) {}
+ try { zookeeper.createPersistent("/" + root + "/target.max", String.valueOf(targetCountMax)); } catch (Exception ignore) {}
+
+ executor = Executors.newSingleThreadScheduledExecutor();
+ executor.scheduleAtFixedRate(new RedisBenchmarkRunnable(), 0, interval, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void stop() {
+ log.debug("stopping redis status service");
+ if (executor != null) {
+ executor.shutdownNow();
+ while (!executor.isTerminated()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ executor = null;
+ }
+ if (zookeeper != null) {
+ zookeeper.close();
+ zookeeper = null;
+ }
+ }
+
+ @Override
+ public int getTargetContainerCount(String containerType) throws Exception {
+ return targetCount;
+ }
+
+ private class RedisBenchmarkRunnable implements Runnable {
+ ExecutorService executor = Executors.newCachedThreadPool();
+ RedisResult aggregateResult;
+
+ @Override
+ public void run() {
+ log.debug("running redis benchmark");
+
+ aggregateResult = new RedisResult(0);
+ Collection<Future<RedisResult>> futures = new ArrayList<Future<RedisResult>>();
+
+ try {
+ Collection<RedisTarget> targets = getTargets();
+
+ // start benchmark
+ for (RedisTarget target : targets) {
+ log.debug(String.format("submitting target '%s'", target));
+ Future<RedisResult> future = executor.submit(new RedisCallable(target));
+ futures.add(future);
+ }
+
+ // aggregate results
+ try {
+ log.debug("waiting for results");
+
+ long limit = System.currentTimeMillis() + timeout;
+ for (Future<RedisResult> future : futures) {
+ try {
+ RedisResult result = future.get(limit - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ log.debug(String.format("got result '%s'", result));
+ aggregate(result);
+ } catch (Exception e) {
+ log.warn(String.format("failed to get result"));
+ future.cancel(true);
+ }
+ }
+ } catch (Exception e) {
+ log.error("Error running redis benchmark", e);
+
+ for (Future<RedisResult> future : futures) {
+ future.cancel(true);
+ }
+
+ return;
+ }
+
+ // compare to thresholds
+ log.debug(String.format("aggregate result is '%s'", aggregateResult));
+
+ // get target from zookeeper
+ try { targetTpsGet = Integer.valueOf(zookeeper.<String> readData("/" + root + "/target.get")); } catch (Exception ignore) {}
+ try { targetTpsSet = Integer.valueOf(zookeeper.<String> readData("/" + root + "/target.set")); } catch (Exception ignore) {}
+ try { targetCountMin = Integer.valueOf(zookeeper.<String> readData("/" + root + "/target.min")); } catch (Exception ignore) {}
+ try { targetCountMax = Integer.valueOf(zookeeper.<String> readData("/" + root + "/target.max")); } catch (Exception ignore) {}
+
+ averageCount = alpha * aggregateResult.serverCount + (1.0 - alpha) * averageCount;
+
+ // calculate counts
+ int targetCountGet = -1;
+ if (aggregateResult.containsKey("GET")) {
+ double tpsTarget = targetTpsGet;
+ double tps = aggregateResult.get("GET");
+
+ averageTpsGet = alpha * tps + (1.0 - alpha) * averageTpsGet;
+
+ targetCountGet = (int) Math.ceil(tpsTarget / averageTpsGet * averageCount);
+ log.debug(String.format("count.get=%d, target.get=%f, tps.get=%f, tps.avg.get=%f, count.avg=%f", targetCountGet, tpsTarget, tps,
+ averageTpsGet, averageCount));
+ }
+
+ int targetCountSet = -1;
+ if (aggregateResult.containsKey("SET")) {
+ double tpsTarget = targetTpsSet;
+ double tps = aggregateResult.get("SET");
+
+ averageTpsSet = alpha * tps + (1.0 - alpha) * averageTpsSet;
+
+ targetCountSet = (int) Math.ceil(tpsTarget / averageTpsSet * averageCount);
+ log.debug(String.format("count.set=%d, target.set=%f, tps.set=%f, tps.avg.set=%f, count.avg=%f", targetCountSet, tpsTarget, tps,
+ averageTpsSet, averageCount));
+ }
+
+ targetCount = Math.max(targetCountGet, targetCountSet);
+
+ if (targetCountMin > 0)
+ targetCount = Math.max(targetCount, targetCountMin);
+ if (targetCountMax > 0)
+ targetCount = Math.min(targetCount, targetCountMax);
+
+ targetCount = Math.max(targetCount, 1);
+
+ log.debug(String.format("target count is %d", targetCount));
+ RedisTargetProvider.this.targetCount = targetCount;
+
+ } catch (Exception e) {
+ log.error("Error running redis benchmark", e);
+
+ for (Future<RedisResult> future : futures) {
+ future.cancel(true);
+ }
+ }
+
+ }
+
+ Collection<RedisTarget> getTargets() {
+ log.debug("fetching redis servers from zookeeper");
+ Collection<RedisTarget> targets = new ArrayList<RedisTarget>();
+ Collection<String> servers = zookeeper.getChildren("/" + root);
+
+ servers.remove("target.get");
+ servers.remove("target.set");
+ servers.remove("target.min");
+ servers.remove("target.max");
+
+ for (String server : servers) {
+ if (!zookeeper.exists("/" + root + "/" + server + "/heartbeat"))
+ continue;
+
+ String hostname = zookeeper.readData("/" + root + "/" + server + "/hostname");
+ int port = Integer.valueOf(zookeeper.<String> readData("/" + root + "/" + server + "/port"));
+
+ targets.add(new RedisTarget(hostname, port));
+ }
+
+ log.debug(String.format("found %d servers: %s", targets.size(), targets));
+ return targets;
+ }
+
+ void aggregate(RedisResult result) {
+ RedisResult newResult = new RedisResult(aggregateResult.serverCount + result.serverCount);
+
+ for (Entry<String, Double> entry : result.entrySet()) {
+ double current = 0.0d;
+ if (aggregateResult.containsKey(entry.getKey()))
+ current = aggregateResult.get(entry.getKey());
+
+ current += entry.getValue();
+ newResult.put(entry.getKey(), current);
+ }
+
+ aggregateResult = newResult;
+ }
+ }
+
+ private static class RedisTarget {
+ final String hostname;
+ final int port;
+
+ public RedisTarget(String hostname, int port) {
+ this.hostname = hostname;
+ this.port = port;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s:%d", hostname, port);
+ }
+ }
+
+ private static class RedisResult extends HashMap<String, Double> {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 4599748807597500952L;
+
+ final int serverCount;
+
+ public RedisResult(int serverCount) {
+ this.serverCount = serverCount;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("[serverCount=%d %s]", serverCount, super.toString());
+ }
+ }
+
+ private class RedisCallable implements Callable<RedisResult> {
+ final RedisTarget target;
+
+ public RedisCallable(RedisTarget target) {
+ this.target = target;
+ }
+
+ @Override
+ public RedisResult call() throws Exception {
+ log.debug(String.format("executing benchmark for '%s'", target));
+
+ ProcessBuilder builder = new ProcessBuilder();
+ builder.command(BENCHMARK_COMMAND, "-h", target.hostname, "-p", String.valueOf(target.port), "-r", String.valueOf(records), "-n",
+ String.valueOf(requests), "-c", String.valueOf(clients), "-t", BENCHMARK_TESTS, "--csv");
+ Process process = builder.start();
+
+ log.debug(String.format("running '%s'", builder.command()));
+
+ RedisResult result = new RedisResult(1);
+
+ int retVal;
+ try {
+ retVal = process.waitFor();
+ } catch (InterruptedException e) {
+ process.destroy();
+ return result;
+ }
+
+ Preconditions.checkState(retVal == 0, "Benchmark process returned %s", retVal);
+
+ Pattern pattern = Pattern.compile("\"([A-Z0-9_]+).*\",\"([0-9\\.]+)\"");
+
+ log.debug("parsing output");
+ BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ Matcher matcher = pattern.matcher(line);
+
+ if (!matcher.find())
+ continue;
+
+ String key = matcher.group(1);
+ Double value = Double.valueOf(matcher.group(2));
+
+ result.put(key, value);
+ }
+
+ log.debug(String.format("benchmark for '%s' returned '%s'", target, result));
+
+ return result;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/StaticTargetProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/StaticTargetProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/StaticTargetProvider.java
new file mode 100644
index 0000000..3159fbe
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/StaticTargetProvider.java
@@ -0,0 +1,62 @@
+package org.apache.helix.metamanager.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.helix.metamanager.TargetProviderService;
+import org.apache.log4j.Logger;
+
+/**
+ * Target model based on manually set count. Count may change dynamically.
+ *
+ */
+public class StaticTargetProvider implements TargetProviderService {
+ static final Logger log = Logger.getLogger(StaticTargetProvider.class);
+
+ final Map<String, Integer> targetCounts = new HashMap<String, Integer>();
+
+ public StaticTargetProvider() {
+ // left blank
+ }
+
+ public StaticTargetProvider(Map<String, Integer> targetCounts) {
+ this.targetCounts.putAll(targetCounts);
+ }
+
+ @Override
+ public int getTargetContainerCount(String containerType) {
+ return targetCounts.get(containerType);
+ }
+
+ public void setTargetContainerCount(String containerType, int targetCount) {
+ targetCounts.put(containerType, targetCount);
+ }
+
+ @Override
+ public void configure(Properties properties) throws Exception {
+ for (Entry<Object, Object> entry : properties.entrySet()) {
+ String key = (String) entry.getKey();
+
+ try {
+ int value = Integer.valueOf((String) entry.getValue());
+ log.debug(String.format("Inserting value '%s = %d'", key, value));
+ targetCounts.put(key, value);
+ } catch (NumberFormatException e) {
+ log.warn(String.format("Skipping '%s', not an integer (value='%s')", key, (String) entry.getValue()));
+ }
+ }
+ }
+
+ @Override
+ public void start() throws Exception {
+ // left blank
+ }
+
+ @Override
+ public void stop() throws Exception {
+ // left blank
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/DummyMasterSlaveProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/DummyMasterSlaveProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/DummyMasterSlaveProcess.java
new file mode 100644
index 0000000..2d91bdd
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/DummyMasterSlaveProcess.java
@@ -0,0 +1,76 @@
+package org.apache.helix.metamanager.impl.container;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.metamanager.container.ContainerProcess;
+import org.apache.helix.metamanager.container.ContainerProcessProperties;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+/**
+ * Sample implementation of container with Helix participant for MasterSlave
+ * state model. Print state transitions only.
+ *
+ */
+public class DummyMasterSlaveProcess extends ContainerProcess {
+
+ static final Logger log = Logger.getLogger(DummyMasterSlaveProcess.class);
+
+ public DummyMasterSlaveProcess(ContainerProcessProperties properties) throws Exception {
+ configure(properties);
+ setModelName("MasterSlave");
+ setModelFactory(new DummyMasterSlaveModelFactory());
+ }
+
+ @Override
+ protected void startContainer() throws Exception {
+ log.info("starting dummy process container");
+ }
+
+ @Override
+ protected void stopContainer() throws Exception {
+ log.info("stopping dummy process container");
+ }
+
+ public static class DummyMasterSlaveModelFactory extends StateModelFactory<DummyMasterSlaveStateModel> {
+ @Override
+ public DummyMasterSlaveStateModel createNewStateModel(String partitionName) {
+ return new DummyMasterSlaveStateModel();
+ }
+ }
+
+ @StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "SLAVE", "MASTER", "DROPPED" })
+ public static class DummyMasterSlaveStateModel extends StateModel {
+
+ static final Logger log = Logger.getLogger(DummyMasterSlaveStateModel.class);
+
+ @Transition(from = "OFFLINE", to = "SLAVE")
+ public void offlineToSlave(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from OFFLINE to SLAVE", context.getManager().getInstanceName()));
+ }
+
+ @Transition(from = "SLAVE", to = "OFFLINE")
+ public void slaveToOffline(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from SLAVE to OFFLINE", context.getManager().getInstanceName()));
+ }
+
+ @Transition(from = "SLAVE", to = "MASTER")
+ public void slaveToMaster(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from SLAVE to MASTER", context.getManager().getInstanceName()));
+ }
+
+ @Transition(from = "MASTER", to = "SLAVE")
+ public void masterToSlave(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from MASTER to SLAVE", context.getManager().getInstanceName()));
+ }
+
+ @Transition(from = "OFFLINE", to = "DROPPED")
+ public void offlineToDropped(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from OFFLINE to DROPPED", context.getManager().getInstanceName()));
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/DummyOnlineOfflineProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/DummyOnlineOfflineProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/DummyOnlineOfflineProcess.java
new file mode 100644
index 0000000..62f63a8
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/DummyOnlineOfflineProcess.java
@@ -0,0 +1,66 @@
+package org.apache.helix.metamanager.impl.container;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.metamanager.container.ContainerProcess;
+import org.apache.helix.metamanager.container.ContainerProcessProperties;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+/**
+ * Sample implementation of container with Helix participant for OnlineOffline
+ * state model. Print state transitions only.
+ *
+ */
+public class DummyOnlineOfflineProcess extends ContainerProcess {
+
+ static final Logger log = Logger.getLogger(DummyOnlineOfflineProcess.class);
+
+ public DummyOnlineOfflineProcess(ContainerProcessProperties properties) throws Exception {
+ configure(properties);
+ setModelName("OnlineOffline");
+ setModelFactory(new DummyOnlineOfflineModelFactory());
+ }
+
+ @Override
+ protected void startContainer() throws Exception {
+ log.info("starting dummy online-offline process container");
+ }
+
+ @Override
+ protected void stopContainer() throws Exception {
+ log.info("stopping dummy online-offline process container");
+ }
+
+ public static class DummyOnlineOfflineModelFactory extends StateModelFactory<DummyOnlineOfflineStateModel> {
+ @Override
+ public DummyOnlineOfflineStateModel createNewStateModel(String partitionName) {
+ return new DummyOnlineOfflineStateModel();
+ }
+ }
+
+ @StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "ONLINE", "DROPPED" })
+ public static class DummyOnlineOfflineStateModel extends StateModel {
+
+ static final Logger log = Logger.getLogger(DummyOnlineOfflineStateModel.class);
+
+ @Transition(from = "OFFLINE", to = "ONLINE")
+ public void offlineToOnline(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from OFFLINE to ONLINE", context.getManager().getInstanceName()));
+ }
+
+ @Transition(from = "ONLINE", to = "OFFLINE")
+ public void onlineToOffline(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from ONLINE to OFFLINE", context.getManager().getInstanceName()));
+ }
+
+ @Transition(from = "OFFLINE", to = "DROPPED")
+ public void offlineToDropped(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from OFFLINE to DROPPED", context.getManager().getInstanceName()));
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/RedisServerProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/RedisServerProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/RedisServerProcess.java
new file mode 100644
index 0000000..c87f905
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/RedisServerProcess.java
@@ -0,0 +1,140 @@
+package org.apache.helix.metamanager.impl.container;
+
+import java.net.InetAddress;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.metamanager.container.ContainerProcess;
+import org.apache.helix.metamanager.container.ContainerProcessProperties;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+/**
+ * Container implementation for redis-server. Uses OnlineOffline model, spawns
+ * Redis as Shell process and writes metadata to zookeeper.
+ *
+ */
+public class RedisServerProcess extends ContainerProcess {
+
+ static final Logger log = Logger.getLogger(RedisServerProcess.class);
+
+ public static final String REDIS_SERVER_COMMAND = "redis-server";
+
+ public static final long MONITOR_INTERVAL = 5000;
+
+ ZkClient zookeeper;
+
+ final String address;
+ final String root;
+ final String name;
+ final int basePort;
+
+ Process process;
+
+ ScheduledExecutorService executor;
+
+ public RedisServerProcess(ContainerProcessProperties properties) throws Exception {
+ configure(properties);
+ setModelName("OnlineOffline");
+ setModelFactory(new RedisServerModelFactory());
+
+ address = properties.getProperty("address");
+ root = properties.getProperty("root");
+ basePort = Integer.valueOf(properties.getProperty("baseport"));
+ name = properties.getProperty(ContainerProcessProperties.NAME);
+ }
+
+ @Override
+ protected void startContainer() throws Exception {
+ log.info(String.format("starting redis server container for instance '%s'", name));
+
+ String hostname = InetAddress.getLocalHost().getHostName();
+ int port = basePort + Integer.valueOf(name.split("_")[1]);
+
+ log.debug(String.format("Starting redis server at '%s:%d'", hostname, port));
+
+ ProcessBuilder builder = new ProcessBuilder();
+ builder.command(REDIS_SERVER_COMMAND, "--port", String.valueOf(port));
+ process = builder.start();
+
+ log.debug("Updating zookeeper");
+ zookeeper = new ZkClient(address);
+ zookeeper.deleteRecursive("/" + root + "/" + name);
+ zookeeper.createPersistent("/" + root + "/" + name, true);
+ zookeeper.createPersistent("/" + root + "/" + name + "/hostname", hostname);
+ zookeeper.createPersistent("/" + root + "/" + name + "/port", String.valueOf(port));
+ zookeeper.createEphemeral("/" + root + "/" + name + "/heartbeat");
+
+ log.debug("Starting process monitor");
+ executor = Executors.newSingleThreadScheduledExecutor();
+ executor.scheduleAtFixedRate(new ProcessMonitor(), 0, MONITOR_INTERVAL, TimeUnit.MILLISECONDS);
+
+ }
+
+ @Override
+ protected void stopContainer() throws Exception {
+ log.info("stopping redis server container");
+
+ log.debug("Stopping process monitor");
+ executor.shutdownNow();
+
+ log.debug("Updating zookeeper");
+ zookeeper.deleteRecursive("/" + root + "/" + name);
+ zookeeper.close();
+
+ log.debug("Stopping process");
+ process.destroy();
+ process.waitFor();
+ }
+
+ public class RedisServerModelFactory extends StateModelFactory<RedisServerModel> {
+ @Override
+ public RedisServerModel createNewStateModel(String partitionName) {
+ return new RedisServerModel();
+ }
+ }
+
+ @StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "ONLINE", "DROPPED" })
+ public class RedisServerModel extends StateModel {
+
+ @Transition(from = "OFFLINE", to = "ONLINE")
+ public void offlineToSlave(Message m, NotificationContext context) {
+ // left blank
+ log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+ }
+
+ @Transition(from = "ONLINE", to = "OFFLINE")
+ public void slaveToOffline(Message m, NotificationContext context) {
+ // left blank
+ log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+ }
+
+ @Transition(from = "OFFLINE", to = "DROPPED")
+ public void offlineToDropped(Message m, NotificationContext context) {
+ // left blank
+ log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+ }
+
+ }
+
+ private class ProcessMonitor implements Runnable {
+ @Override
+ public void run() {
+ try {
+ process.exitValue();
+ log.warn("detected process failure");
+ fail();
+ } catch (Exception e) {
+ // expected
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/ZookeeperMasterSlaveProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/ZookeeperMasterSlaveProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/ZookeeperMasterSlaveProcess.java
new file mode 100644
index 0000000..a493a71
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/container/ZookeeperMasterSlaveProcess.java
@@ -0,0 +1,108 @@
+package org.apache.helix.metamanager.impl.container;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.metamanager.container.ContainerProcess;
+import org.apache.helix.metamanager.container.ContainerProcessProperties;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+/**
+ * Sample implementation of container with Helix participant for MasterSlave
+ * state model. Writes current state to separate zookeeper domain.
+ *
+ */
+public class ZookeeperMasterSlaveProcess extends ContainerProcess {
+
+ static final Logger log = Logger.getLogger(ZookeeperMasterSlaveProcess.class);
+
+ ZkClient zookeeper;
+
+ final String address;
+ final String root;
+ final String name;
+
+ public ZookeeperMasterSlaveProcess(ContainerProcessProperties properties) throws Exception {
+ configure(properties);
+ setModelName("MasterSlave");
+ setModelFactory(new ZookeeperMasterSlaveModelFactory());
+
+ address = properties.getProperty("address");
+ root = properties.getProperty("root");
+ name = properties.getProperty(ContainerProcessProperties.NAME);
+ }
+
+ @Override
+ protected void startContainer() throws Exception {
+ log.info("starting zookeeper process container");
+
+ zookeeper = new ZkClient(address);
+ zookeeper.createPersistent("/" + root + "/" + name, true);
+ }
+
+ @Override
+ protected void stopContainer() throws Exception {
+ log.info("stopping zookeeper process container");
+
+ zookeeper.close();
+ }
+
+ public class ZookeeperMasterSlaveModelFactory extends StateModelFactory<ZookeeperMasterSlaveModel> {
+ @Override
+ public ZookeeperMasterSlaveModel createNewStateModel(String partitionName) {
+ return new ZookeeperMasterSlaveModel();
+ }
+ }
+
+ @StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "SLAVE", "MASTER", "DROPPED" })
+ public class ZookeeperMasterSlaveModel extends StateModel {
+
+ @Transition(from = "OFFLINE", to = "SLAVE")
+ public void offlineToSlave(Message m, NotificationContext context) {
+ transition(m, context);
+ }
+
+ @Transition(from = "SLAVE", to = "OFFLINE")
+ public void slaveToOffline(Message m, NotificationContext context) {
+ transition(m, context);
+ }
+
+ @Transition(from = "SLAVE", to = "MASTER")
+ public void slaveToMaster(Message m, NotificationContext context) {
+ transition(m, context);
+ }
+
+ @Transition(from = "MASTER", to = "SLAVE")
+ public void masterToSlave(Message m, NotificationContext context) {
+ transition(m, context);
+ }
+
+ @Transition(from = "OFFLINE", to = "DROPPED")
+ public void offlineToDropped(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+
+ String resource = m.getResourceName();
+ String partition = m.getPartitionName();
+ String path = "/" + root + "/" + name + "/" + resource + "_" + partition;
+
+ zookeeper.delete(path);
+ }
+
+ public void transition(Message m, NotificationContext context) {
+ log.trace(String.format("%s transitioning from %s to %s", context.getManager().getInstanceName(), m.getFromState(), m.getToState()));
+
+ String resource = m.getResourceName();
+ String partition = m.getPartitionName();
+ String path = "/" + root + "/" + name + "/" + resource + "_" + partition;
+
+ zookeeper.delete(path);
+ zookeeper.createEphemeral(path, m.getToState());
+ }
+
+ }
+
+}