You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/09/20 20:30:18 UTC
[09/15] Adding Helix-task-framework and Yarn integration modules
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/assembly/assembly.xml b/recipes/meta-cluster-manager/src/main/assembly/assembly.xml
new file mode 100644
index 0000000..03b2ca5
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/assembly/assembly.xml
@@ -0,0 +1,32 @@
+<assembly
+ xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
+ http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+
+ <id>assembly</id>
+ <formats>
+ <format>tar.gz</format>
+ </formats>
+ <baseDirectory>metamanager</baseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>target/metamanager-pkg/repo</directory>
+ <outputDirectory>repo</outputDirectory>
+ <excludes>
+ <exclude>**/maven-metadata-appassembler.xml</exclude>
+ </excludes>
+ <fileMode>0644</fileMode>
+ </fileSet>
+ <fileSet>
+ <directory>target/metamanager-pkg/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <fileMode>0755</fileMode>
+ </fileSet>
+ <fileSet>
+ <directory>target/metamanager-pkg/conf</directory>
+ <outputDirectory>conf</outputDirectory>
+ <fileMode>0644</fileMode>
+ </fileSet>
+ </fileSets>
+</assembly>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/config/log4j.properties
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/config/log4j.properties b/recipes/meta-cluster-manager/src/main/config/log4j.properties
new file mode 100644
index 0000000..af33e21
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/config/log4j.properties
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Set root logger level to DEBUG and its only appender to R.
+log4j.rootLogger=ERROR, C
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.C=org.apache.log4j.ConsoleAppender
+log4j.appender.C.layout=org.apache.log4j.PatternLayout
+log4j.appender.C.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+log4j.logger.org.I0Itec=ERROR
+log4j.logger.org.apache=ERROR
+
+log4j.logger.org.apache.helix.metamanager=INFO
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterAdmin.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterAdmin.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterAdmin.java
new file mode 100644
index 0000000..9a83f02
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterAdmin.java
@@ -0,0 +1,30 @@
+package org.apache.helix.metamanager;
+
+/**
+ * Abstraction for instance config (container) injection into and removal from
+ * the managed cluster.
+ *
+ */
+public interface ClusterAdmin {
+
+ /**
+ * Add instance configuration to managed cluster.
+ *
+ * @param instanceId
+ * @param instanceTag
+ */
+ public void addInstance(String instanceId, String instanceTag);
+
+ /**
+ * Remove instance configuration from managed cluster.<br/>
+ * <b>INVARIANT:</b> idempotent
+ *
+ * @param instanceId
+ */
+ public void removeInstance(String instanceId);
+
+ /**
+ * Trigger rebalance of any affected resource in the managed cluster.
+ */
+ public void rebalance();
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterContainerProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterContainerProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterContainerProvider.java
new file mode 100644
index 0000000..6aca07a
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterContainerProvider.java
@@ -0,0 +1,32 @@
+package org.apache.helix.metamanager;
+
+public interface ClusterContainerProvider {
+ /**
+ * Create container of given type.
+ *
+ * @param id
+ * unique user-defined container id
+ * @param type
+ * container type
+ * @throws Exception
+ * @return connection string
+ */
+ public void create(String id, String type) throws Exception;
+
+ /**
+ * Destroy container.
+ *
+ * @param id
+ * unique user-defined container id
+ * @return connection string
+ * @throws Exception
+ */
+ public void destroy(String id) throws Exception;
+
+ /**
+ * Stops all running processes and destroys containers. Best-effort for
+ * cleanup.
+ *
+ */
+ public void destroyAll();
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterContainerStatusProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterContainerStatusProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterContainerStatusProvider.java
new file mode 100644
index 0000000..e68c0ee
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterContainerStatusProvider.java
@@ -0,0 +1,7 @@
+package org.apache.helix.metamanager;
+
+public interface ClusterContainerStatusProvider {
+ public boolean exists(String id);
+ public boolean isActive(String id);
+ public boolean isFailed(String id);
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterInstanceInjector.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterInstanceInjector.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterInstanceInjector.java
new file mode 100644
index 0000000..d29e1c3
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterInstanceInjector.java
@@ -0,0 +1,6 @@
+package org.apache.helix.metamanager;
+
+public interface ClusterInstanceInjector {
+ public void addInstance(String connection);
+ public void removeInstance(String connection);
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterStatusProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterStatusProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterStatusProvider.java
new file mode 100644
index 0000000..1812dc3
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterStatusProvider.java
@@ -0,0 +1,5 @@
+package org.apache.helix.metamanager;
+
+public interface ClusterStatusProvider {
+ public int getTargetContainerCount(String containerType) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ConfigTool.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ConfigTool.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ConfigTool.java
new file mode 100644
index 0000000..596743b
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ConfigTool.java
@@ -0,0 +1,47 @@
+package org.apache.helix.metamanager;
+
+import org.apache.log4j.Logger;
+
+public class ConfigTool {
+
+ static final Logger log = Logger.getLogger(ConfigTool.class);
+
+ public static final String SHELL_CONTAINER_PATH = "target/metamanager-pkg/bin/shell-container-process.sh";
+ public static final String SHELL_CONTAINER_PROPERTIES = "container.properties";
+ public static final String SHELL_CONTAINER_MARKER = "active";
+
+ public static final String YARN_MASTER_ARCHIVE_PATH = "target/metamanager-assembly.tar.gz";
+ public static final String YARN_MASTER_PATH = "master/metamanager/bin/yarn-master-process.sh";
+ public static final String YARN_MASTER_STAGING = "master.tar.gz";
+ public static final String YARN_MASTER_DESTINATION = "master";
+ public static final String YARN_MASTER_PROPERTIES = "master.properties";
+ public static final String YARN_CONTAINER_ARCHIVE_PATH = "target/metamanager-assembly.tar.gz";
+ public static final String YARN_CONTAINER_STAGING = "container.tar.gz";
+ public static final String YARN_CONTAINER_PATH = "container/metamanager/bin/yarn-container-process.sh";
+ public static final String YARN_CONTAINER_DESTINATION = "container";
+ public static final String YARN_CONTAINER_PROPERTIES = "container.properties";
+
+ public static final long CONTAINER_TIMEOUT = 60000;
+
+ static TargetProvider targetProvider;
+ static StatusProvider statusProvider;
+
+ private ConfigTool() {
+ // left blank
+ }
+
+ public static TargetProvider getTargetProvider() {
+ return targetProvider;
+ }
+ public static void setTargetProvider(TargetProvider targetProvider) {
+ ConfigTool.targetProvider = targetProvider;
+ }
+
+ public static StatusProvider getStatusProvider() {
+ return statusProvider;
+ }
+ public static void setStatusProvider(StatusProvider statusProvider) {
+ ConfigTool.statusProvider = statusProvider;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ContainerProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ContainerProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ContainerProvider.java
new file mode 100644
index 0000000..2483bba
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ContainerProvider.java
@@ -0,0 +1,40 @@
+package org.apache.helix.metamanager;
+
+import org.apache.helix.metamanager.provider.ProviderStateModel;
+
+/**
+ * Abstraction for container deployment framework. Creates and destroys
+ * container instances. Is invoked by ProviderStateModel and must be blocking.
+ *
+ * @see ProviderStateModel
+ */
+public interface ContainerProvider {
+ /**
+ * Create container of given type.<br/>
+ * <b>INVARIANT:</b> synchronous invocation
+ *
+ * @param id
+ * unique user-defined container id
+ * @param containerType
+ * container type
+ * @throws Exception
+ */
+ public void create(String id, String containerType) throws Exception;
+
+ /**
+ * Destroy container.<br/>
+ * <b>INVARIANT:</b> synchronous invocation
+ *
+ * @param id
+ * unique user-defined container id
+ * @throws Exception
+ */
+ public void destroy(String id) throws Exception;
+
+ /**
+ * Stops all running processes and destroys containers. Best-effort for
+ * cleanup.
+ *
+ */
+ public void destroyAll();
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ContainerProviderService.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ContainerProviderService.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ContainerProviderService.java
new file mode 100644
index 0000000..a7da053
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ContainerProviderService.java
@@ -0,0 +1,9 @@
+package org.apache.helix.metamanager;
+
+/**
+ * ContainerProvider as configurable service.
+ *
+ */
+public interface ContainerProviderService extends ContainerProvider, Service {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ContainerStatusProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ContainerStatusProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ContainerStatusProvider.java
new file mode 100644
index 0000000..d2853d9
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ContainerStatusProvider.java
@@ -0,0 +1,7 @@
+package org.apache.helix.metamanager;
+
+public interface ContainerStatusProvider {
+ public boolean exists(String id);
+ public boolean isActive(String id);
+ public boolean isFailed(String id);
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/FileStatusProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/FileStatusProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/FileStatusProvider.java
new file mode 100644
index 0000000..06e2251
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/FileStatusProvider.java
@@ -0,0 +1,27 @@
+package org.apache.helix.metamanager;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+
+public class FileStatusProvider implements ClusterStatusProvider {
+
+ final File file;
+
+ public FileStatusProvider(File file) {
+ this.file = file;
+ }
+
+ @Override
+ public int getTargetContainerCount(String containerType) throws FileNotFoundException, IOException, IllegalArgumentException {
+ Properties properties = new Properties();
+ properties.load(new FileReader(file));
+ if(!properties.contains(containerType))
+ throw new IllegalArgumentException(String.format("container type '%s' not found in '%s'", containerType, file.getCanonicalPath()));
+ return Integer.parseInt((String)properties.get(containerType));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/HelixClusterAdmin.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/HelixClusterAdmin.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/HelixClusterAdmin.java
new file mode 100644
index 0000000..3dd2f48
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/HelixClusterAdmin.java
@@ -0,0 +1,43 @@
+package org.apache.helix.metamanager;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.log4j.Logger;
+
+/**
+ * Implementation of ClusterAdmin based on Helix.
+ *
+ */
+public class HelixClusterAdmin implements ClusterAdmin {
+
+ static final Logger log = Logger.getLogger(HelixClusterAdmin.class);
+
+ final String cluster;
+ final HelixAdmin admin;
+
+ public HelixClusterAdmin(String clusterName, HelixAdmin admin) {
+ this.cluster = clusterName;
+ this.admin = admin;
+ }
+
+ @Override
+ public synchronized void addInstance(String instanceId, String instanceTag) {
+ log.debug(String.format("injecting instance %s (tag=%s) in cluster %s", instanceId, instanceTag, cluster));
+ admin.addInstance(cluster, new InstanceConfig(instanceId));
+ admin.addInstanceTag(cluster, instanceId, instanceTag);
+ }
+
+ @Override
+ public synchronized void removeInstance(String connection) {
+ log.debug(String.format("removing instance %s from cluster %s", connection, cluster));
+ admin.dropInstance(cluster, new InstanceConfig(connection));
+ }
+
+ @Override
+ public void rebalance() {
+ for (String resourceName : admin.getResourcesInCluster(cluster)) {
+ int replica = Integer.parseInt(admin.getResourceIdealState(cluster, resourceName).getReplicas());
+ admin.rebalance(cluster, resourceName, replica);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/Manager.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/Manager.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/Manager.java
new file mode 100644
index 0000000..ab91ae7
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/Manager.java
@@ -0,0 +1,129 @@
+package org.apache.helix.metamanager;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+@StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "ONLINE" })
+public class Manager extends StateModel {
+
+ static final Logger log = Logger.getLogger(Manager.class);
+
+ ClusterContainerProvider provider;
+ ClusterAdmin admin;
+
+ public Manager(ClusterContainerProvider provider, ClusterAdmin admin) {
+ this.provider = provider;
+ this.admin = admin;
+ }
+
+ @Transition(from = "OFFLINE", to = "ONLINE")
+ public void acquire(Message m, NotificationContext context) throws Exception {
+ String containerType = m.getResourceName();
+ String containerId = m.getPartitionName();
+ String instanceId = context.getManager().getInstanceName();
+
+ log.trace(String.format("%s:%s transitioning from OFFLINE to ONLINE",
+ containerId, instanceId));
+
+ bestEffortRemove(containerId);
+
+ // add instance to cluster
+ admin.addInstance(containerId);
+
+ // create container
+ provider.create(containerId, containerType);
+
+ try {
+ admin.rebalance();
+ } catch (Exception e) {
+ // ignore
+ log.warn(String.format("rebalancing cluster failed (error='%s')", e.getMessage()));
+ }
+
+ log.info(String.format("%s acquired container '%s' (type='%s')",
+ instanceId, containerId, containerType));
+ }
+
+ @Transition(from = "ONLINE", to = "OFFLINE")
+ public void release(Message m, NotificationContext context) {
+ String containerId = m.getPartitionName();
+ String instanceId = context.getManager().getInstanceName();
+
+ log.trace(String.format("%s:%s transitioning from ONLINE to OFFLINE",
+ containerId, instanceId));
+
+ bestEffortRemove(containerId);
+
+ try {
+ admin.rebalance();
+ } catch (Exception e) {
+ // ignore
+ log.warn(String.format("rebalancing cluster failed (error='%s')", e.getMessage()));
+ }
+
+ log.info(String.format("%s destroyed container '%s'",
+ instanceId, containerId));
+
+ }
+
+ @Transition(from = "ERROR", to = "OFFLINE")
+ public void recover(Message m, NotificationContext context) {
+ String containerId = m.getPartitionName();
+ String instanceId = context.getManager().getInstanceName();
+
+ log.trace(String.format("%s:%s transitioning from ERROR to OFFLINE",
+ containerId, instanceId));
+ }
+
+ @Transition(from = "OFFLINE", to = "DROPPED")
+ public void drop(Message m, NotificationContext context) {
+ String containerId = m.getPartitionName();
+ String instanceId = context.getManager().getInstanceName();
+
+ log.trace(String.format("%s:%s transitioning from OFFLINE to DROPPED",
+ containerId, instanceId));
+ }
+
+ private void bestEffortRemove(String containerId) {
+ log.debug(String.format("Best effort removal of container '%s'", containerId));
+
+ try {
+ provider.destroy(containerId);
+ log.debug(String.format("Container '%s' destroyed", containerId));
+ } catch (Exception e) {
+ log.debug(String.format("Container '%s' does not exist", containerId));
+ }
+
+ try {
+ admin.removeInstance(containerId);
+ log.debug(String.format("Instance '%s' removed", containerId));
+ } catch (Exception e) {
+ log.debug(String.format("Instance '%s' does not exist", containerId));
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerDemo.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerDemo.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerDemo.java
new file mode 100644
index 0000000..35891f0
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerDemo.java
@@ -0,0 +1,463 @@
+package org.apache.helix.metamanager;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.metamanager.managed.HelixClusterAdmin;
+import org.apache.helix.metamanager.managed.LocalStatusProvider;
+import org.apache.helix.metamanager.provider.local.LocalContainerProvider;
+import org.apache.helix.metamanager.provider.local.LocalContainerStatusProvider;
+import org.apache.helix.metamanager.provider.shell.ShellContainerProvider;
+import org.apache.helix.metamanager.provider.shell.ShellContainerStatusProvider;
+import org.apache.helix.metamanager.provider.yarn.ApplicationConfig;
+import org.apache.helix.metamanager.provider.yarn.YarnApplication;
+import org.apache.helix.metamanager.provider.yarn.YarnContainerProvider;
+import org.apache.helix.metamanager.provider.yarn.YarnContainerStatusProvider;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.log4j.Logger;
+
+public class ManagerDemo
+{
+ static final long TIMESTEP_INTERVAL = 1000;
+
+ static final String MANAGED_PROCESS_PATH = "target/meta-cluster-manager-pkg/bin/container-process.sh";
+ static final String YARN_PROCESS_PATH = "/home/apucher/incubator-helix/recipes/meta-cluster-manager/target/meta-cluster-manager-pkg/bin/yarn-container-process.sh";
+
+ static final String PROVIDER_LOCAL = "LOCAL";
+ static final String PROVIDER_SHELL = "SHELL";
+ static final String PROVIDER_YARN = "YARN";
+
+ static final Logger log = Logger.getLogger(ManagerDemo.class);
+
+ static final int zkPort = 2199;
+ static final String zkAddress = "localhost:" + zkPort;
+ static final String metaClusterName = "meta-cluster";
+ static final String managedClusterName = "managed-cluster";
+ static final String metaResourceName = "container";
+ static final String managedResourceName = "database";
+
+ static final int numContainerProviders = 3;
+ static final int numContainerMax = 7;
+ static final int numContainerMin = 3;
+ static final int numContainerStep = 2;
+ static final int numContainerReplica = 1;
+
+ static final int numManagedPartitions = 10;
+ static final int numManagedReplica = 2;
+
+ static List<ClusterContainerProvider> providers = new ArrayList<ClusterContainerProvider>();
+ static int providerCount = 0;
+
+ static Collection<YarnContainerProvider> yarnProviders = new ArrayList<YarnContainerProvider>();
+ static Collection<YarnContainerStatusProvider> yarnStatusProviders = new ArrayList<YarnContainerStatusProvider>();
+ static Collection<YarnApplication> yarnApplications = new ArrayList<YarnApplication>();
+
+ /**
+ * LockManagerDemo clusterName, numInstances, lockGroupName, numLocks
+ *
+ * @param args
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception
+ {
+
+ String containerProviderType = PROVIDER_LOCAL;
+ if(args.length >= 1) {
+ containerProviderType = args[0];
+ }
+
+ LocalStatusProvider clusterStatusProvider = null;
+ ManagerProcess[] managerProcesses = new ManagerProcess[numContainerProviders];
+
+ HelixManager metaControllerManager = null;
+ HelixManager managedControllerManager = null;
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ for(ClusterContainerProvider provider : providers) {
+ log.info("Destroying all containers of provider");
+ provider.destroyAll();
+ }
+ for(YarnContainerProvider provider : yarnProviders) {
+ log.info("Stopping yarn container provider");
+ provider.stopService();
+ }
+ for(YarnContainerStatusProvider provider : yarnStatusProviders) {
+ log.info("Stopping yarn container status provider");
+ provider.stopService();
+ }
+ for(YarnApplication application: yarnApplications) {
+ log.info("Stopping yarn application");
+ try { application.stop(); } catch(Exception ignore) {}
+ }
+ }
+ }));
+
+ try
+ {
+ log.info("Starting ZooKeeper");
+ startLocalZookeeper();
+ HelixAdmin admin = new ZKHelixAdmin(zkAddress);
+
+ log.info("Create clusters");
+ admin.addCluster(metaClusterName, true);
+ admin.addCluster(managedClusterName, true);
+
+ log.info("Create providers");
+ clusterStatusProvider = new LocalStatusProvider(numContainerMin);
+ ClusterContainerStatusProvider containerStatusProvider = createContainerStatusProvider(containerProviderType);
+
+ log.info("Setup config tool");
+ ConfigTool.setClusterStatusProvider(clusterStatusProvider);
+ ConfigTool.setContainerStatusProvider(containerStatusProvider);
+
+ // Managed Cluster
+ log.info("Setup managed cluster");
+ admin.addStateModelDef(managedClusterName, "MasterSlave",
+ new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave()));
+ admin.addResource(managedClusterName, managedResourceName, numManagedPartitions,
+ "MasterSlave", IdealStateModeProperty.AUTO_REBALANCE.toString());
+ admin.rebalance(managedClusterName, managedResourceName, numManagedReplica);
+
+ // Meta Cluster
+ log.info("Setup meta cluster");
+ admin.addStateModelDef(metaClusterName, "OnlineOffline",
+ new StateModelDefinition(StateModelConfigGenerator.generateConfigForOnlineOffline()));
+ admin.addResource(metaClusterName, metaResourceName, clusterStatusProvider.getTargetContainerCount(""),
+ "OnlineOffline", IdealStateModeProperty.AUTO_REBALANCE.toString());
+
+ IdealState idealState = admin.getResourceIdealState(metaClusterName, metaResourceName);
+ idealState.setRebalancerClassName(ManagerRebalancer.class.getName());
+ //idealState.getRecord().setSimpleField(IdealStateProperty.REBALANCE_TIMER_PERIOD.toString(), "2000"); // Timer trigger creates race condition
+ admin.setResourceIdealState(metaClusterName, metaResourceName, idealState);
+ admin.rebalance(metaClusterName, metaResourceName, 1);
+
+ log.info("Starting meta processes (container providers)");
+ for (int i = 0; i < numContainerProviders; i++)
+ {
+ String instanceName = "provider_" + i;
+ admin.addInstance(metaClusterName, new InstanceConfig(instanceName));
+
+ ClusterAdmin clusterAdmin = new HelixClusterAdmin(managedClusterName, managedResourceName, numManagedReplica, admin);
+
+ managerProcesses[i] = new ManagerProcess(metaClusterName, zkAddress,
+ instanceName, createContainerProvider(containerProviderType), clusterAdmin);
+ managerProcesses[i].start();
+ }
+
+ log.info("Starting managed cluster controller");
+ managedControllerManager = HelixControllerMain.startHelixController(zkAddress,
+ managedClusterName, "managedController", HelixControllerMain.STANDALONE);
+ log.info("Starting meta cluster controller");
+ metaControllerManager = HelixControllerMain.startHelixController(zkAddress,
+ metaClusterName, "metaController", HelixControllerMain.STANDALONE);
+
+ waitUntilRebalancedCount(numContainerMin, admin);
+ printStep("Initial cluster state", admin);
+
+ while(clusterStatusProvider.getTargetContainerCount("") < numContainerMax) {
+ int newCount = clusterStatusProvider.getTargetContainerCount("") + numContainerStep;
+
+ log.info(String.format("Increasing container count to %d", newCount));
+ clusterStatusProvider.setTargetContainerCount(newCount);
+
+ triggerPipeline(admin);
+ waitUntilRebalancedCount(newCount, admin);
+ printStep(String.format("Increased container count to %d", newCount), admin);
+ }
+
+ log.info("Destroying container 0 and container 1");
+ int currentCount = clusterStatusProvider.getTargetContainerCount("");
+ providers.get(0).destroy("container_0");
+ providers.get(0).destroy("container_1");
+ triggerPipeline(admin);
+ waitUntilRebalancedCount(currentCount, admin);
+ printStep("Destroyed container 0 and container 1", admin);
+
+ log.info("Destroying container provider 0");
+ currentCount = clusterStatusProvider.getTargetContainerCount("");
+ managerProcesses[0].stop();
+ waitUntilRebalancedCount(currentCount, admin);
+ printStep("Destroyed container provider 0", admin);
+
+ while(clusterStatusProvider.getTargetContainerCount("") > numContainerMin) {
+ int newCount = clusterStatusProvider.getTargetContainerCount("") - numContainerStep;
+
+ log.info(String.format("Decreasing container count to %d", newCount));
+ clusterStatusProvider.setTargetContainerCount(newCount);
+
+ triggerPipeline(admin);
+ waitUntilRebalancedCount(newCount, admin);
+ printStep(String.format("Decreased container count to %d", clusterStatusProvider.getTargetContainerCount("")), admin);
+ }
+
+ log.info("Stopping processes");
+
+ } catch (Exception e)
+ {
+ e.printStackTrace();
+ } finally
+ {
+ if (managedControllerManager != null) {
+ log.info("Disconnecting managed cluster controller");
+ managedControllerManager.disconnect();
+ }
+ if (metaControllerManager != null) {
+ log.info("Disconnecting meta cluster controller");
+ metaControllerManager.disconnect();
+ }
+ log.info("Destroying meta processes");
+ for (ManagerProcess process : managerProcesses) {
+ process.stop();
+ }
+ }
+
+ // TODO clean up threads correctly
+ System.exit(0);
+ }
+
+private static void triggerPipeline(HelixAdmin admin) {
+ IdealState poke = admin.getResourceIdealState(metaClusterName, metaResourceName);
+ admin.setResourceIdealState(metaClusterName, metaResourceName, poke);
+}
+
+ private static void printStep(String text, HelixAdmin admin) throws Exception {
+ log.info("********************************************************************************");
+ log.info(text);
+ log.info("********************************************************************************");
+ printClusterStatus(admin);
+
+ System.out.println("Press ENTER to continue");
+ System.in.read();
+ }
+
+ static void printClusterStatus(HelixAdmin admin) throws Exception {
+ log.info("Managed cluster status");
+ printStatusMasterSlave(admin);
+ log.info("Meta cluster status");
+ printMetaClusterStatus(admin);
+ }
+
+ static void waitUntilRebalancedCount(int containerCount, HelixAdmin admin) throws InterruptedException {
+ Thread.sleep(TIMESTEP_INTERVAL);
+ while(containerCount != getMetaContainerCount(admin) ||
+ containerCount != getManagedContainerCount(admin)) {
+ Thread.sleep(TIMESTEP_INTERVAL);
+ }
+ ClusterStateVerifier.verifyByPolling(new BestPossAndExtViewZkVerifier(zkAddress, managedClusterName));
+ }
+
+ static int getMetaContainerCount(HelixAdmin admin) {
+ Set<String> assignedInstances = new HashSet<String>();
+
+ ExternalView externalView = admin.getResourceExternalView(metaClusterName, metaResourceName);
+
+ for (String partitionName : externalView.getPartitionSet())
+ {
+ Map<String, String> stateMap = externalView.getStateMap(partitionName);
+ if(stateMap == null)
+ continue;
+
+ for(String instanceName : stateMap.keySet()){
+ if ("ONLINE".equals(stateMap.get(instanceName))) {
+ assignedInstances.add(partitionName);
+ break;
+ }
+ }
+ }
+
+ return assignedInstances.size();
+ }
+
+ static int getManagedContainerCount(HelixAdmin admin) {
+ Set<String> assignedInstances = new HashSet<String>();
+
+ ExternalView externalView = admin.getResourceExternalView(managedClusterName, managedResourceName);
+
+ for (String partitionName : externalView.getPartitionSet())
+ {
+ Map<String, String> stateMap = externalView.getStateMap(partitionName);
+ if(stateMap == null)
+ continue;
+
+ for(String instanceName : stateMap.keySet()){
+ if ("MASTER".equals(stateMap.get(instanceName)) ||
+ "SLAVE".equals(stateMap.get(instanceName))) {
+ assignedInstances.add(instanceName);
+ }
+ }
+ }
+
+ return assignedInstances.size();
+ }
+
+ static void printMetaClusterStatus(HelixAdmin admin)
+ {
+ ExternalView externalView = admin
+ .getResourceExternalView(metaClusterName, metaResourceName);
+ TreeSet<String> treeSet = new TreeSet<String>(
+ externalView.getPartitionSet());
+ log.info("container" + "\t" + "acquired by");
+ log.info("======================================");
+ for (String partitionName : treeSet)
+ {
+ Map<String, String> stateMap = externalView.getStateMap(partitionName);
+ String acquiredBy = null;
+ if (stateMap != null)
+ {
+ for(String instanceName:stateMap.keySet()){
+ if ("ONLINE".equals(stateMap.get(instanceName))){
+ acquiredBy = instanceName;
+ break;
+ }
+ }
+ }
+ log.info(partitionName + "\t"
+ + ((acquiredBy != null) ? acquiredBy : "NONE"));
+ }
+ }
+
+ static void printStatusMasterSlave(HelixAdmin admin)
+ {
+ ExternalView externalView = admin
+ .getResourceExternalView(managedClusterName, managedResourceName);
+ TreeSet<String> treeSet = new TreeSet<String>(
+ externalView.getPartitionSet());
+ log.info("partition" + "\t" + "master" + "\t\t" + "slave");
+ log.info("============================================================");
+ for (String partitionName : treeSet)
+ {
+ Map<String, String> stateMap = externalView.getStateMap(partitionName);
+ String master = "NONE";
+ String slave = "NONE";
+ if (stateMap != null)
+ {
+ for(String instanceName:stateMap.keySet()){
+ if ("MASTER".equals(stateMap.get(instanceName))){
+ master = instanceName;
+ }
+ if ("SLAVE".equals(stateMap.get(instanceName))){
+ slave = instanceName;
+ }
+ }
+ }
+ log.info(String.format("%s\t%s\t%s", partitionName, master, slave));
+ }
+ }
+
+ public static void startLocalZookeeper() throws Exception
+ {
+ ZkServer server = null;
+ String baseDir = "/tmp/IntegrationTest/";
+ final String dataDir = baseDir + "zk/dataDir";
+ final String logDir = baseDir + "/tmp/logDir";
+ FileUtils.deleteDirectory(new File(dataDir));
+ FileUtils.deleteDirectory(new File(logDir));
+
+ IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace()
+ {
+ @Override
+ public void createDefaultNameSpace(ZkClient zkClient)
+ {
+
+ }
+ };
+ server = new ZkServer(dataDir, logDir, defaultNameSpace, zkPort);
+ server.start();
+
+ }
+
+ private static ClusterContainerProvider createContainerProvider(String type) throws Exception {
+ String providerName = "provider_" + providerCount;
+ providerCount++;
+
+ if(PROVIDER_LOCAL.equalsIgnoreCase(type)) {
+ log.info("Using VM-local container provider");
+ LocalContainerProvider provider = new LocalContainerProvider(zkAddress, managedClusterName, providerName);
+ providers.add(provider);
+ return provider;
+ } else if (PROVIDER_SHELL.equalsIgnoreCase(type)) {
+ log.info("Using shell-based container provider");
+ ShellContainerProvider provider = new ShellContainerProvider(zkAddress, managedClusterName, providerName, MANAGED_PROCESS_PATH);
+ providers.add(provider);
+ return provider;
+ } else if (PROVIDER_YARN.equalsIgnoreCase(type)) {
+ ApplicationConfig appConfig = new ApplicationConfig(zkAddress, managedClusterName, zkAddress, providerName);
+
+ log.info("Using yarn-based container provider");
+ YarnApplication yarnApplication = new YarnApplication(appConfig);
+ yarnApplication.start();
+ yarnApplications.add(yarnApplication);
+
+ YarnContainerProvider yarnProvider = new YarnContainerProvider(appConfig, YARN_PROCESS_PATH);
+ yarnProvider.startService();
+ yarnProviders.add(yarnProvider);
+
+ providers.add(yarnProvider);
+ return yarnProvider;
+ } else {
+ throw new IllegalArgumentException(String.format("Unknown container provider type '%s'", type));
+ }
+ }
+
+ private static ClusterContainerStatusProvider createContainerStatusProvider(String type) throws Exception {
+ if(PROVIDER_LOCAL.equalsIgnoreCase(type)) {
+ log.info("Using VM-local container status provider");
+ LocalContainerStatusProvider provider = new LocalContainerStatusProvider();
+ return provider;
+ } else if (PROVIDER_SHELL.equalsIgnoreCase(type)) {
+ log.info("Using shell-based container status provider");
+ ShellContainerStatusProvider provider = new ShellContainerStatusProvider();
+ return provider;
+ } else if (PROVIDER_YARN.equalsIgnoreCase(type)) {
+ log.info("Using yarn-based container status provider");
+ YarnContainerStatusProvider provider = new YarnContainerStatusProvider(zkAddress);
+ provider.startService();
+ yarnStatusProviders.add(provider);
+ return provider;
+ } else {
+ throw new IllegalArgumentException(String.format("Unknown container status provider type '%s'", type));
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerFactory.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerFactory.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerFactory.java
new file mode 100644
index 0000000..44a924e
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerFactory.java
@@ -0,0 +1,39 @@
+package org.apache.helix.metamanager;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+public class ManagerFactory extends StateModelFactory<Manager> {
+
+ final ClusterContainerProvider provider;
+ final ClusterAdmin admin;
+
+ public ManagerFactory(ClusterContainerProvider provider, ClusterAdmin admin) {
+ super();
+ this.provider = provider;
+ this.admin = admin;
+ }
+
+ @Override
+ public Manager createNewStateModel(String partitionName) {
+ return new Manager(provider, admin);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerProcess.java
new file mode 100644
index 0000000..7812e6f
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerProcess.java
@@ -0,0 +1,67 @@
+package org.apache.helix.metamanager;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.log4j.Logger;
+
+public class ManagerProcess
+{
+ static final Logger log = Logger.getLogger(ManagerProcess.class);
+
+ final String clusterName;
+ final String zkAddress;
+ final String instanceName;
+ final ClusterContainerProvider provider;
+ final ClusterAdmin admin;
+
+ HelixManager participantManager;
+
+ ManagerProcess(String clusterName, String zkAddress, String instanceName, ClusterContainerProvider provider, ClusterAdmin admin)
+ {
+ this.clusterName = clusterName;
+ this.zkAddress = zkAddress;
+ this.instanceName = instanceName;
+ this.provider = provider;
+ this.admin = admin;
+ }
+
+ public void start() throws Exception
+ {
+ log.info("STARTING "+ instanceName);
+ participantManager = HelixManagerFactory.getZKHelixManager(clusterName,
+ instanceName, InstanceType.PARTICIPANT, zkAddress);
+ participantManager.getStateMachineEngine().registerStateModelFactory(
+ "OnlineOffline", new ManagerFactory(provider, admin));
+ participantManager.connect();
+ log.info("STARTED "+ instanceName);
+
+ }
+
+ public void stop()
+ {
+ if (participantManager != null)
+ {
+ participantManager.disconnect();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerRebalancer.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerRebalancer.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerRebalancer.java
new file mode 100644
index 0000000..2b2824c
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerRebalancer.java
@@ -0,0 +1,167 @@
+package org.apache.helix.metamanager;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.log4j.Logger;
+
+/**
+ * Rebalancer for cluster state. Uses cluster status provider.<br/>
+ * <br/>
+ * IdealState mapping:<br/>
+ * resource = tag-name<br/>
+ * partition = logical container<br/>
+ * instance = resource provider<br/>
+ * status = physical container presence
+ *
+ */
+public class ManagerRebalancer implements Rebalancer {
+
+ static final Logger log = Logger.getLogger(ManagerRebalancer.class);
+
+ static final long UPDATE_INTERVAL_MIN = 1500;
+
+ static final Object lock = new Object();
+ static long nextUpdate = 0;
+
+ ClusterStatusProvider clusterStatusProvider;
+ ClusterContainerStatusProvider containerStatusProvider;
+ HelixManager manager;
+
+ @Override
+ public void init(HelixManager manager) {
+ this.clusterStatusProvider = ConfigTool.getClusterStatusProvider();
+ this.containerStatusProvider = ConfigTool.getContainerStatusProvider();
+ this.manager = manager;
+ }
+
+ @Override
+ public IdealState computeNewIdealState(String resourceName,
+ IdealState currentIdealState,
+ CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
+
+// synchronized(lock) {
+// if(nextUpdate > System.currentTimeMillis()) {
+// return currentIdealState;
+// }
+// nextUpdate = System.currentTimeMillis() + UPDATE_INTERVAL_MIN;
+
+ // target container count
+ int targetCount = clusterStatusProvider.getTargetContainerCount(resourceName);
+
+ // currently active containers
+ List<String> currentPartitions = new ArrayList<String>();
+ for(String partitionName : currentIdealState.getPartitionSet()) {
+ Map<String, String> currentStateMap = currentStateOutput.getCurrentStateMap(resourceName, new Partition(partitionName));
+ Map<String, String> pendingStateMap = currentStateOutput.getPendingStateMap(resourceName, new Partition(partitionName));
+
+ if(hasOnlineInstance(currentStateMap) ||
+ hasOnlineInstance(pendingStateMap)) {
+ currentPartitions.add(partitionName);
+ }
+ }
+ int currentCount = currentPartitions.size();
+
+ // currently failed containers
+ List<String> failedPartitions = new ArrayList<String>();
+ for(String partitionName : currentIdealState.getPartitionSet()) {
+ Map<String, String> currentStateMap = currentStateOutput.getCurrentStateMap(resourceName, new Partition(partitionName));
+
+ if(!hasOnlineInstance(currentStateMap))
+ continue;
+
+ // container listed online, but does not exist
+ if(!containerStatusProvider.exists(partitionName)) {
+ log.warn(String.format("Container '%s' designated ONLINE, but does not exist", partitionName));
+ failedPartitions.add(partitionName);
+ }
+
+ // container listed online and exists, but in failure state
+ if(containerStatusProvider.exists(partitionName) &&
+ containerStatusProvider.isFailed(partitionName)) {
+ log.warn(String.format("Container '%s' designated ONLINE, but in failure state", partitionName));
+ failedPartitions.add(partitionName);
+ }
+ }
+ int failureCount = failedPartitions.size();
+
+ if(currentCount != targetCount ||
+ failureCount != 0) {
+ log.info(String.format("Rebalancing containers (current=%d, target=%d, failures=%d)", currentCount, targetCount, failureCount));
+
+ currentIdealState.setNumPartitions(targetCount);
+
+ // future active containers
+ log.debug("active containers");
+ List<String> activePartitions = new ArrayList<String>();
+ for(int i=0; i<targetCount; i++) {
+ String partitionName = resourceName + "_" + i;
+ activePartitions.add(partitionName);
+ }
+ activePartitions.removeAll(failedPartitions);
+
+ // future passive containers
+ log.debug("passive containers");
+ List<String> passivePartitions = new ArrayList<String>();
+ for(int i=targetCount; i<currentCount; i++) {
+ String partitionName = resourceName + "_" + i;
+ passivePartitions.add(partitionName);
+ }
+ passivePartitions.addAll(failedPartitions);
+
+ log.debug("output");
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("%s: failed partitions %s", resourceName, failedPartitions));
+ log.debug(String.format("%s: active partitions %s", resourceName, activePartitions));
+ log.debug(String.format("%s: passive partitions %s", resourceName, passivePartitions));
+ }
+
+ log.debug("building ideal state");
+ Map<String, List<String>> listFields = new HashMap<String, List<String>>();
+ Map<String, Map<String, String>> mapFields = new HashMap<String, Map<String, String>>();
+ for(String partitionName : activePartitions) {
+ listFields.put(partitionName, new ArrayList<String>());
+ mapFields.put(partitionName, new HashMap<String, String>());
+ }
+ currentIdealState.getRecord().setListFields(listFields);
+ currentIdealState.getRecord().setMapFields(mapFields);
+
+ log.debug("setting ideal state");
+ String clusterName = manager.getClusterName();
+ manager.getClusterManagmentTool().setResourceIdealState(clusterName, resourceName, currentIdealState);
+
+ log.debug("enable partitions");
+ for(String instanceName : clusterData.getInstanceConfigMap().keySet()) {
+ log.debug(String.format("enable partitions for '%s'", instanceName));
+ manager.getClusterManagmentTool().enablePartition(true, clusterName, instanceName, resourceName, activePartitions);
+ log.debug(String.format("disable partitions for '%s'", instanceName));
+ manager.getClusterManagmentTool().enablePartition(false, clusterName, instanceName, resourceName, passivePartitions);
+ }
+
+ log.debug("done");
+ }
+
+ return currentIdealState;
+// }
+ }
+
+ private boolean hasOnlineInstance(Map<String, String> stateMap) {
+ if(!stateMap.isEmpty()) {
+ for(Map.Entry<String, String> entry : stateMap.entrySet()) {
+ if(entry.getValue().equals("ONLINE")) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/MetaManagerDemo.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/MetaManagerDemo.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/MetaManagerDemo.java
new file mode 100644
index 0000000..d0be313
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/MetaManagerDemo.java
@@ -0,0 +1,457 @@
+package org.apache.helix.metamanager;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.metamanager.container.ContainerUtils;
+import org.apache.helix.metamanager.impl.StaticTargetProvider;
+import org.apache.helix.metamanager.impl.local.LocalContainerProvider;
+import org.apache.helix.metamanager.impl.local.LocalStatusProvider;
+import org.apache.helix.metamanager.impl.shell.ShellContainerProvider;
+import org.apache.helix.metamanager.impl.shell.ShellStatusProvider;
+import org.apache.helix.metamanager.impl.yarn.YarnApplicationProperties;
+import org.apache.helix.metamanager.impl.yarn.YarnContainerProvider;
+import org.apache.helix.metamanager.impl.yarn.YarnStatusProvider;
+import org.apache.helix.metamanager.provider.ProviderProcess;
+import org.apache.helix.metamanager.provider.ProviderRebalancer;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.log4j.Logger;
+
+public class MetaManagerDemo
+{
+ static final long TIMESTEP_INTERVAL = 1000;
+
+ static final String PROVIDER_LOCAL = "LOCAL";
+ static final String PROVIDER_SHELL = "SHELL";
+ static final String PROVIDER_YARN = "YARN";
+
+ static final Logger log = Logger.getLogger(MetaManagerDemo.class);
+
+ static final int zkPort = 2199;
+ static final String zkAddress = "localhost:" + zkPort;
+ static final String metaClusterName = "meta-cluster";
+ static final String managedClusterName = "managed-cluster";
+ static final String metaResourceName = "container";
+ static final String managedResourceName = "database";
+
+ static final int numContainerProviders = 3;
+ static final int numContainerMax = 7;
+ static final int numContainerMin = 3;
+ static final int numContainerStep = 2;
+ static final int numContainerReplica = 1;
+
+ static final int numManagedPartitions = 10;
+ static final int numManagedReplica = 2;
+
+ static List<ContainerProvider> providers = new ArrayList<ContainerProvider>();
+ static int providerCount = 0;
+
+ static Collection<Service> services = new ArrayList<Service>();
+
+ /**
+ * LockManagerDemo clusterName, numInstances, lockGroupName, numLocks
+ *
+ * @param args
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception
+ {
+
+ String containerProviderType = PROVIDER_LOCAL;
+ if(args.length >= 1) {
+ containerProviderType = args[0];
+ }
+
+ StaticTargetProvider targetProvider = null;
+ ProviderProcess[] managerProcesses = new ProviderProcess[numContainerProviders];
+
+ HelixManager metaControllerManager = null;
+ HelixManager managedControllerManager = null;
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ log.info("Destroying containers");
+ for (ContainerProvider provider : providers) {
+ provider.destroyAll();
+ }
+ log.info("Stopping services");
+ for (Service service : services) {
+ try { service.stop(); } catch (Exception ignore) {}
+ }
+ }
+ }));
+
+ try
+ {
+ log.info("Starting ZooKeeper");
+ startLocalZookeeper();
+ HelixAdmin admin = new ZKHelixAdmin(zkAddress);
+
+ log.info("Create clusters");
+ admin.addCluster(metaClusterName, true);
+ admin.addCluster(managedClusterName, true);
+
+ log.info("Create providers");
+ targetProvider = startService(new StaticTargetProvider(Collections.singletonMap(metaResourceName, numContainerMin)));
+ StatusProvider statusProvider = startService(createContainerStatusProvider(containerProviderType));
+
+ log.info("Setup config tool");
+ ConfigTool.setClusterStatusProvider(targetProvider);
+ ConfigTool.setContainerStatusProvider(statusProvider);
+
+ // Managed Cluster
+ log.info("Setup managed cluster");
+ admin.addStateModelDef(managedClusterName, "MasterSlave",
+ new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave()));
+ admin.addResource(managedClusterName, managedResourceName, numManagedPartitions,
+ "MasterSlave", RebalanceMode.FULL_AUTO.toString());
+
+ IdealState managedIdealState = admin.getResourceIdealState(managedClusterName, managedResourceName);
+ managedIdealState.setInstanceGroupTag(metaResourceName);
+ managedIdealState.setReplicas(String.valueOf(numManagedReplica));
+ admin.setResourceIdealState(managedClusterName, managedResourceName, managedIdealState);
+
+ // Meta Cluster
+ log.info("Setup meta cluster");
+ admin.addStateModelDef(metaClusterName, "OnlineOffline",
+ new StateModelDefinition(StateModelConfigGenerator.generateConfigForOnlineOffline()));
+ admin.addResource(metaClusterName, metaResourceName, targetProvider.getTargetContainerCount(metaResourceName),
+ "OnlineOffline", RebalanceMode.USER_DEFINED.toString());
+
+ IdealState metaIdealState = admin.getResourceIdealState(metaClusterName, metaResourceName);
+ metaIdealState.setRebalancerClassName(ProviderRebalancer.class.getName());
+ metaIdealState.setReplicas("1");
+ admin.setResourceIdealState(metaClusterName, metaResourceName, metaIdealState);
+
+ log.info("Starting meta processes (container providers)");
+ for (int i = 0; i < numContainerProviders; i++)
+ {
+ String instanceName = "provider_" + i;
+ admin.addInstance(metaClusterName, new InstanceConfig(instanceName));
+
+ ClusterAdmin clusterAdmin = new HelixClusterAdmin(managedClusterName, admin);
+
+ managerProcesses[i] = new ProviderProcess(metaClusterName, zkAddress,
+ instanceName, startService(createContainerProvider(containerProviderType)), clusterAdmin);
+ managerProcesses[i].start();
+ }
+
+ log.info("Starting managed cluster controller");
+ managedControllerManager = HelixControllerMain.startHelixController(zkAddress,
+ managedClusterName, "managedController", HelixControllerMain.STANDALONE);
+ log.info("Starting meta cluster controller");
+ metaControllerManager = HelixControllerMain.startHelixController(zkAddress,
+ metaClusterName, "metaController", HelixControllerMain.STANDALONE);
+
+ waitUntilRebalancedCount(numContainerMin, admin);
+ printStep("Initial cluster state", admin);
+
+ while(targetProvider.getTargetContainerCount(metaResourceName) < numContainerMax) {
+ int newCount = targetProvider.getTargetContainerCount(metaResourceName) + numContainerStep;
+
+ log.info(String.format("Increasing container count to %d", newCount));
+ targetProvider.setTargetContainerCount(metaResourceName, newCount);
+
+ triggerPipeline(admin);
+ waitUntilRebalancedCount(newCount, admin);
+ printStep(String.format("Increased container count to %d", newCount), admin);
+ }
+
+ log.info("Destroying container 0 and container 1");
+ int currentCount = targetProvider.getTargetContainerCount(metaResourceName);
+ providers.get(0).destroy("container_0");
+ providers.get(0).destroy("container_1");
+ triggerPipeline(admin);
+ waitUntilRebalancedCount(currentCount, admin);
+ printStep("Destroyed container 0 and container 1", admin);
+
+ log.info("Destroying container provider 0");
+ currentCount = targetProvider.getTargetContainerCount(metaResourceName);
+ managerProcesses[0].stop();
+ waitUntilRebalancedCount(currentCount, admin);
+ printStep("Destroyed container provider 0", admin);
+
+ while(targetProvider.getTargetContainerCount(metaResourceName) > numContainerMin) {
+ int newCount = targetProvider.getTargetContainerCount(metaResourceName) - numContainerStep;
+
+ log.info(String.format("Decreasing container count to %d", newCount));
+ targetProvider.setTargetContainerCount(metaResourceName, newCount);
+
+ triggerPipeline(admin);
+ waitUntilRebalancedCount(newCount, admin);
+ printStep(String.format("Decreased container count to %d", targetProvider.getTargetContainerCount(metaResourceName)), admin);
+ }
+
+ log.info("Stopping processes");
+
+ } catch (Exception e)
+ {
+ e.printStackTrace();
+ } finally
+ {
+ if (managedControllerManager != null) {
+ log.info("Disconnecting managed cluster controller");
+ managedControllerManager.disconnect();
+ }
+ if (metaControllerManager != null) {
+ log.info("Disconnecting meta cluster controller");
+ metaControllerManager.disconnect();
+ }
+ log.info("Destroying meta processes");
+ for (ProviderProcess process : managerProcesses) {
+ process.stop();
+ }
+ }
+
+ // TODO clean up threads correctly
+ System.exit(0);
+ }
+
+private static void triggerPipeline(HelixAdmin admin) {
+ IdealState poke = admin.getResourceIdealState(metaClusterName, metaResourceName);
+ admin.setResourceIdealState(metaClusterName, metaResourceName, poke);
+}
+
+ private static void printStep(String text, HelixAdmin admin) throws Exception {
+ log.info("********************************************************************************");
+ log.info(text);
+ log.info("********************************************************************************");
+ printClusterStatus(admin);
+
+ System.out.println("Press ENTER to continue");
+ System.in.read();
+ }
+
+ static void printClusterStatus(HelixAdmin admin) throws Exception {
+ log.info("Managed cluster status");
+ printStatusMasterSlave(admin);
+ log.info("Meta cluster status");
+ printMetaClusterStatus(admin);
+ }
+
+ static void waitUntilRebalancedCount(int containerCount, HelixAdmin admin) throws InterruptedException {
+ Thread.sleep(TIMESTEP_INTERVAL);
+ while(containerCount != getMetaContainerCount(admin) ||
+ containerCount != getManagedContainerCount(admin)) {
+ Thread.sleep(TIMESTEP_INTERVAL);
+ }
+ ClusterStateVerifier.verifyByPolling(new BestPossAndExtViewZkVerifier(zkAddress, managedClusterName));
+ }
+
+ static int getMetaContainerCount(HelixAdmin admin) {
+ Set<String> assignedInstances = new HashSet<String>();
+
+ ExternalView externalView = admin.getResourceExternalView(metaClusterName, metaResourceName);
+
+ for (String partitionName : externalView.getPartitionSet())
+ {
+ Map<String, String> stateMap = externalView.getStateMap(partitionName);
+ if(stateMap == null)
+ continue;
+
+ for(String instanceName : stateMap.keySet()){
+ if ("ONLINE".equals(stateMap.get(instanceName))) {
+ assignedInstances.add(partitionName);
+ break;
+ }
+ }
+ }
+
+ return assignedInstances.size();
+ }
+
+ static int getManagedContainerCount(HelixAdmin admin) {
+ Set<String> assignedInstances = new HashSet<String>();
+
+ ExternalView externalView = admin.getResourceExternalView(managedClusterName, managedResourceName);
+
+ for (String partitionName : externalView.getPartitionSet())
+ {
+ Map<String, String> stateMap = externalView.getStateMap(partitionName);
+ if(stateMap == null)
+ continue;
+
+ for(String instanceName : stateMap.keySet()){
+ if ("MASTER".equals(stateMap.get(instanceName)) ||
+ "SLAVE".equals(stateMap.get(instanceName))) {
+ assignedInstances.add(instanceName);
+ }
+ }
+ }
+
+ return assignedInstances.size();
+ }
+
+ static void printMetaClusterStatus(HelixAdmin admin)
+ {
+ ExternalView externalView = admin
+ .getResourceExternalView(metaClusterName, metaResourceName);
+ TreeSet<String> treeSet = new TreeSet<String>(
+ externalView.getPartitionSet());
+ log.info("container" + "\t" + "acquired by");
+ log.info("======================================");
+ for (String partitionName : treeSet)
+ {
+ Map<String, String> stateMap = externalView.getStateMap(partitionName);
+ String acquiredBy = null;
+ if (stateMap != null)
+ {
+ for(String instanceName:stateMap.keySet()){
+ if ("ONLINE".equals(stateMap.get(instanceName))){
+ acquiredBy = instanceName;
+ break;
+ }
+ }
+ }
+ log.info(partitionName + "\t"
+ + ((acquiredBy != null) ? acquiredBy : "NONE"));
+ }
+ }
+
+ static void printStatusMasterSlave(HelixAdmin admin)
+ {
+ ExternalView externalView = admin
+ .getResourceExternalView(managedClusterName, managedResourceName);
+ TreeSet<String> treeSet = new TreeSet<String>(
+ externalView.getPartitionSet());
+ log.info("partition" + "\t" + "master" + "\t\t" + "slave");
+ log.info("============================================================");
+ for (String partitionName : treeSet)
+ {
+ Map<String, String> stateMap = externalView.getStateMap(partitionName);
+ String master = "NONE";
+ String slave = "NONE";
+ if (stateMap != null)
+ {
+ for(String instanceName:stateMap.keySet()){
+ if ("MASTER".equals(stateMap.get(instanceName))){
+ master = instanceName;
+ }
+ if ("SLAVE".equals(stateMap.get(instanceName))){
+ slave = instanceName;
+ }
+ }
+ }
+ log.info(String.format("%s\t%s\t%s", partitionName, master, slave));
+ }
+ }
+
+ public static void startLocalZookeeper() throws Exception
+ {
+ ZkServer server = null;
+ String baseDir = "/tmp/metamanager/";
+ final String dataDir = baseDir + "zk/dataDir";
+ final String logDir = baseDir + "zk/logDir";
+ FileUtils.deleteDirectory(new File(dataDir));
+ FileUtils.deleteDirectory(new File(logDir));
+
+ IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace()
+ {
+ @Override
+ public void createDefaultNameSpace(ZkClient zkClient)
+ {
+
+ }
+ };
+ server = new ZkServer(dataDir, logDir, defaultNameSpace, zkPort);
+ server.start();
+
+ }
+
+ private static ContainerProviderService createContainerProvider(String type) throws Exception {
+ String providerName = "provider_" + providerCount;
+ providerCount++;
+
+ if(PROVIDER_LOCAL.equalsIgnoreCase(type)) {
+ log.info("Using VM-local container provider");
+ LocalContainerProvider provider = new LocalContainerProvider(zkAddress, managedClusterName, providerName);
+ provider.registerType("container", ContainerUtils.getPropertiesFromResource("container.properties"));
+ providers.add(provider);
+ return provider;
+ } else if (PROVIDER_SHELL.equalsIgnoreCase(type)) {
+ log.info("Using shell-based container provider");
+ ShellContainerProvider provider = new ShellContainerProvider(zkAddress, managedClusterName, providerName);
+ provider.registerType("container", ContainerUtils.getPropertiesFromResource("container.properties"));
+ providers.add(provider);
+ return provider;
+ } else if (PROVIDER_YARN.equalsIgnoreCase(type)) {
+ YarnApplicationProperties properties = new YarnApplicationProperties();
+ properties.put(YarnApplicationProperties.HELIX_CLUSTER, managedClusterName);
+ properties.put(YarnApplicationProperties.HELIX_ZOOKEEPER, zkAddress);
+ properties.put(YarnApplicationProperties.PROVIDER_METADATA, zkAddress);
+ properties.put(YarnApplicationProperties.PROVIDER_NAME, providerName);
+
+ log.info("Using yarn-based container provider");
+ YarnContainerProvider yarnProvider = new YarnContainerProvider(properties);
+ yarnProvider.registerType("container", ContainerUtils.getPropertiesFromResource("container.properties"));
+
+ providers.add(yarnProvider);
+ return yarnProvider;
+ } else {
+ throw new IllegalArgumentException(String.format("Unknown container provider type '%s'", type));
+ }
+ }
+
+ private static StatusProviderService createContainerStatusProvider(String type) throws Exception {
+ if(PROVIDER_LOCAL.equalsIgnoreCase(type)) {
+ log.info("Using VM-local container status provider");
+ return new LocalStatusProvider();
+ } else if (PROVIDER_SHELL.equalsIgnoreCase(type)) {
+ log.info("Using shell-based container status provider");
+ return new ShellStatusProvider();
+ } else if (PROVIDER_YARN.equalsIgnoreCase(type)) {
+ log.info("Using yarn-based container status provider");
+ return new YarnStatusProvider(zkAddress);
+ } else {
+ throw new IllegalArgumentException(String.format("Unknown container status provider type '%s'", type));
+ }
+ }
+
+ private static <T extends Service> T startService(T service) throws Exception {
+ service.start();
+ services.add(service);
+ return service;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/Service.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/Service.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/Service.java
new file mode 100644
index 0000000..c13a62e
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/Service.java
@@ -0,0 +1,38 @@
+package org.apache.helix.metamanager;
+
+import java.util.Properties;
+
+/**
+ * Abstraction for configurable and runnable service. Light-weight dependency
+ * injection and life-cycle management.
+ *
+ */
+public interface Service {
+
+ /**
+ * Configure service internals<br/>
+ * <b>INVARIANT:</b> executed only once
+ *
+ * @param properties
+ * arbitrary key-value properties, parsed internally
+ * @throws Exception
+ */
+ void configure(Properties properties) throws Exception;
+
+ /**
+ * Start service.<br/>
+ * <b>PRECONDITION:</b> configure() was invoked<br/>
+ * <b>INVARIANT:</b> executed only once
+ *
+ * @throws Exception
+ */
+ void start() throws Exception;
+
+ /**
+ * Stop service.<br/>
+ * <b>INVARIANT:</b> idempotent
+ *
+ * @throws Exception
+ */
+ void stop() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/StaticStatusProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/StaticStatusProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/StaticStatusProvider.java
new file mode 100644
index 0000000..249b9b8
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/StaticStatusProvider.java
@@ -0,0 +1,28 @@
+package org.apache.helix.metamanager;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class StaticStatusProvider implements ClusterStatusProvider {
+
+ final Map<String, Integer> targetCounts = new HashMap<String, Integer>();
+
+ public StaticStatusProvider() {
+ // left blank
+ }
+
+ public StaticStatusProvider(Map<String, Integer> targetCounts) {
+ this.targetCounts.putAll(targetCounts);
+ }
+
+ @Override
+ public int getTargetContainerCount(String containerType) {
+ return targetCounts.get(containerType);
+ }
+
+ public void setTargetContainerCount(String containerType, int targetCount) {
+ targetCounts.put(containerType, targetCount);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/StatusProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/StatusProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/StatusProvider.java
new file mode 100644
index 0000000..841f08d
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/StatusProvider.java
@@ -0,0 +1,35 @@
+package org.apache.helix.metamanager;
+
+import org.apache.helix.metamanager.provider.ProviderRebalancer;
+
+/**
+ * Abstraction for status reader of container deployment framework. Provides
+ * information on physical existence of container and activity or failure state.
+ * Is polled by ProviderRebalancer and should be light-weight and non-blocking.<br/>
+ * <b>NOTE:</b> This information is solely based on the low-level framework and
+ * may be different from the participant state in Helix. (The Helix participant
+ * may not even exist)
+ *
+ * @see ProviderRebalancer
+ */
+public interface StatusProvider {
+
+ /**
+ * Determine whether container physically exists.
+ *
+ * @param id
+ * unique container id
+ * @return true, if container is present
+ */
+ public boolean exists(String id);
+
+ /**
+ * Determine whether container is healthy as determined by the deployment
+ * framework.
+ *
+ * @param id
+ * unique container id
+ * @return true, if container is healthy
+ */
+ public boolean isHealthy(String id);
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/StatusProviderService.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/StatusProviderService.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/StatusProviderService.java
new file mode 100644
index 0000000..3c2739d
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/StatusProviderService.java
@@ -0,0 +1,9 @@
+package org.apache.helix.metamanager;
+
+/**
+ * StatusProvider as configurable service.
+ *
+ */
+public interface StatusProviderService extends StatusProvider, Service {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/TargetProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/TargetProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/TargetProvider.java
new file mode 100644
index 0000000..22524c4
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/TargetProvider.java
@@ -0,0 +1,25 @@
+package org.apache.helix.metamanager;
+
+import org.apache.helix.metamanager.provider.ProviderRebalancer;
+
+/**
+ * Abstraction for target computation and statistics collection. Provides target
+ * count of containers for ProviderRebalancer. Is polled by ProviderRebalancer
+ * and should be light-weight and non-blocking.<br/>
+ * <b>NOTE:</b> The target count is oblivious of failed containers and can be
+ * obtained in an arbitrary way. See implementations for examples.
+ *
+ * @see ProviderRebalancer
+ */
+public interface TargetProvider {
+
+ /**
+ * Return target count of containers of a specific type.
+ *
+ * @param containerType
+ * meta resource name
+ * @return container count >= 1
+ * @throws Exception
+ */
+ public int getTargetContainerCount(String containerType) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/TargetProviderService.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/TargetProviderService.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/TargetProviderService.java
new file mode 100644
index 0000000..4d6275e
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/TargetProviderService.java
@@ -0,0 +1,9 @@
+package org.apache.helix.metamanager;
+
+/**
+ * TargetProvider as configurable service.
+ *
+ */
+public interface TargetProviderService extends TargetProvider, Service {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ZookeeperSetter.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ZookeeperSetter.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ZookeeperSetter.java
new file mode 100644
index 0000000..39e20fe
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ZookeeperSetter.java
@@ -0,0 +1,30 @@
+package org.apache.helix.metamanager;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.log4j.Logger;
+
+/**
+ * Utility for setting String values in the embedded zookeeper service.
+ * (Program entry point)
+ *
+ */
+public class ZookeeperSetter {
+
+ static Logger log = Logger.getLogger(ZookeeperSetter.class);
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+ String address = args[0];
+ String path = args[1];
+ String value = args[2];
+
+ log.info(String.format("Setting %s:%s to '%s'", address, path, value));
+
+ ZkClient client = new ZkClient(address);
+ client.createPersistent(path, true);
+ client.writeData(path, value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/bootstrap/BootUtil.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/bootstrap/BootUtil.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/bootstrap/BootUtil.java
new file mode 100644
index 0000000..004de06
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/bootstrap/BootUtil.java
@@ -0,0 +1,58 @@
+package org.apache.helix.metamanager.bootstrap;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.helix.metamanager.container.ContainerProcessProperties;
+import org.apache.log4j.Logger;
+
+public class BootUtil {
+
+ public static final String CLASS_PROPERTY = "class";
+ static final Logger log = Logger.getLogger(BootUtil.class);
+
+ public static Properties getNamespace(String namespace, Properties source) {
+ Properties dest = new Properties();
+ String prefix = namespace + ".";
+
+ for(Map.Entry<Object, Object> rawEntry : source.entrySet()) {
+ String key = (String)rawEntry.getKey();
+ String value = (String)rawEntry.getValue();
+
+ if(key.startsWith(prefix)) {
+ String newKey = key.substring(prefix.length());
+ dest.put(newKey, value);
+ }
+ }
+
+ return dest;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <T> T createInstance(Properties properties) throws Exception {
+ String className = properties.getProperty(CLASS_PROPERTY);
+
+ Class<?> containerClass = Class.forName(className);
+
+ try {
+ log.debug(String.format("checking for properties constructor in class '%s'", className));
+ return (T)containerClass.getConstructor(ContainerProcessProperties.class).newInstance(properties);
+ } catch (Exception e) {
+ log.debug("no properties constructor found");
+ }
+
+ try {
+ log.debug(String.format("checking for default constructor in class '%s'", className));
+ return (T)containerClass.getConstructor().newInstance();
+ } catch (Exception e) {
+ log.debug("no default constructor found");
+ }
+
+ throw new Exception(String.format("no suitable constructor for class '%s'", className));
+ }
+
+ private BootUtil() {
+ // left blank
+ }
+
+}