You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/08/14 23:21:11 UTC

[helix] branch dynamically-loaded-task updated: Demo for Autoscaling in Task Framework (#1273)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch dynamically-loaded-task
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/dynamically-loaded-task by this push:
     new 4ca5e86  Demo for Autoscaling in Task Framework (#1273)
4ca5e86 is described below

commit 4ca5e864ed18615c064a273fa2ee715414f69322
Author: rabashizade <67...@users.noreply.github.com>
AuthorDate: Fri Aug 14 19:21:02 2020 -0400

    Demo for Autoscaling in Task Framework (#1273)
    
    Adds the YAML manifests, test script requried, and instructions for
    demoing elastic scaling for Task Framework on Minikube.
---
 demo/elastic-scaling/README                        |  72 ++++++++
 .../helix-controller-deployment.yaml               |  27 +++
 demo/elastic-scaling/helix-core.Dockerfile         |   3 +
 .../helix-participant-deployment.yaml              |  27 +++
 demo/elastic-scaling/helix-rest-deployment.yaml    |  40 +++++
 demo/elastic-scaling/helix-rest.Dockerfile         |   3 +
 .../helix-zookeeper-deployment.yaml                |  53 ++++++
 demo/elastic-scaling/hpa.yaml                      |  34 ++++
 demo/elastic-scaling/plot.py                       |  35 ++++
 demo/elastic-scaling/testing-adapter.yaml          | 184 +++++++++++++++++++++
 .../org/apache/helix/examples/ExampleProcess.java  |   5 +-
 .../helix/manager/zk/ParticipantManager.java       |   9 +
 .../helix/task/TestDemoDynamicTaskLoading.java     |   2 +-
 .../org/apache/helix/task/TestElasticScaling.java  | 145 ++++++++++++++++
 14 files changed, 636 insertions(+), 3 deletions(-)

diff --git a/demo/elastic-scaling/README b/demo/elastic-scaling/README
new file mode 100644
index 0000000..65bbc4b
--- /dev/null
+++ b/demo/elastic-scaling/README
@@ -0,0 +1,72 @@
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+
+This directory contains the YAML manifests for Kubernetes
+objects, Dockerfiles for Docker images of Helix components,
+and the Python script for visualization used in the demo for
+elastic Task Framework. Note that this demo dynamically loads
+the task it runs. If you want statically loaded tasks, you
+need to register the task you want to execute in the
+org.apache.helix.manager.zk.ParticipantManager constructor.
+To run the demo, follow these steps:
+
+1. Start a Kubernetes cluster. You can use Minikube for the
+   purposes of this demo. In that case, if it is not already
+   installed, you should first by following directions found
+   at https://kubernetes.io/docs/tasks/tools/install-minikube/
+
+2. Run the following commands. Wait for the Kubernetes objects
+   related to each command to stabilize before you run the
+   commands in the next step. You can check the status of the
+   objects in the Kubernetes dashboard. If you are using
+   minikube, you can access the dashboard by running
+   "minikube dashboard" in a separate terminal.
+
+   You also need to have kubectl installed to run these
+   commands. Instructions to do so can be found at:
+   https://kubernetes.io/docs/tasks/tools/install-kubectl/
+
+   a. Deploy ZooKeeper:
+      > kubectl apply -f helix-zookeeper-deployment.yaml
+
+   b. Deploy REST server:
+      > kubectl apply -f helix-rest-deployment.yaml
+
+   c. Create a cluster called "MYCLUSTER" and add Helix
+      Controller and Participant Deployments to it:
+      > curl -X PUT http://172.17.0.3:30000/admin/v2/clusters/MYCLUSTER
+      > kubectl apply -f helix-controller-deployment.yaml
+      > kubectl apply -f helix-participant-deployment.yaml
+
+      NOTE: Minikube IP is assumed to be 172.17.0.3. You need
+      to change this IP to your Minikube's instance's IP, and
+      also use that IP in org.apache.helix.task.TestElasticScaling.
+
+   d. Deploy custom metrics server and create Horizontal Pod
+      Autoscaler. Source code for custom metrics server and
+      instructions on how to build it can be fount at
+      https://github.com/rabashizade/custom-metrics-apiserver
+      > kubectl apply -f testing-adapter.yaml
+      > kubectl apply -f hpa.yaml
+
+3. (Optional) Run plot.py in a separate terminal for
+   visualization.
+
+4. Run the test script in
+   org.apache.helix.task.TestElasticScaling.
diff --git a/demo/elastic-scaling/helix-controller-deployment.yaml b/demo/elastic-scaling/helix-controller-deployment.yaml
new file mode 100644
index 0000000..78f1acc
--- /dev/null
+++ b/demo/elastic-scaling/helix-controller-deployment.yaml
@@ -0,0 +1,27 @@
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  name: helix-controller-deployment
+  labels:
+    app: controller-deployment
+spec:
+  replicas: 1
+  selector:
+    matchLabels:
+      app: controller
+  template:
+    metadata:
+      labels:
+        app: controller
+    spec:
+      volumes:
+      - name: host-volume
+        hostPath:
+          path: /home/docker
+      containers:
+      - image: rabashizade/helix-core:1.3
+        name: helix-controller
+        volumeMounts:
+        - mountPath: /tmp
+          name: host-volume
+        command: ["/bin/sh", "-c", "/target/helix-core-pkg/bin/run-helix-controller.sh --zkSvr helix-zookeeper-service-internal:2199 --cluster MYCLUSTER 2>&1 > /tmp/controller.log"]
diff --git a/demo/elastic-scaling/helix-core.Dockerfile b/demo/elastic-scaling/helix-core.Dockerfile
new file mode 100644
index 0000000..373c867
--- /dev/null
+++ b/demo/elastic-scaling/helix-core.Dockerfile
@@ -0,0 +1,3 @@
+FROM openjdk:8-jre-alpine3.9
+
+COPY ../../helix-core/ ./
diff --git a/demo/elastic-scaling/helix-participant-deployment.yaml b/demo/elastic-scaling/helix-participant-deployment.yaml
new file mode 100644
index 0000000..9175321
--- /dev/null
+++ b/demo/elastic-scaling/helix-participant-deployment.yaml
@@ -0,0 +1,27 @@
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  name: helix-participant-deployment
+  labels:
+    app: participant-deployment
+spec:
+  replicas: 1
+  selector:
+    matchLabels:
+      app: participant
+  template:
+    metadata:
+      labels:
+        app: participant
+    spec:
+      volumes:
+      - name: host-volume
+        hostPath:
+          path: /home/docker
+      containers:
+      - image: rabashizade/helix-core:1.3
+        name: helix-participant
+        volumeMounts:
+        - mountPath: /tmp
+          name: host-volume
+        command: ["/bin/sh", "-c", "/target/helix-core-pkg/bin/start-helix-participant.sh --zkSvr helix-zookeeper-service-internal:2199 --cluster MYCLUSTER --port 12900 --stateModelType MasterSlave 2>&1 > /tmp/participant.log"]
diff --git a/demo/elastic-scaling/helix-rest-deployment.yaml b/demo/elastic-scaling/helix-rest-deployment.yaml
new file mode 100644
index 0000000..37ac5bc
--- /dev/null
+++ b/demo/elastic-scaling/helix-rest-deployment.yaml
@@ -0,0 +1,40 @@
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  name: helix-rest-deployment
+  labels:
+    app: rest-deployment
+spec:
+  replicas: 1
+  selector:
+    matchLabels:
+      app: rest
+  template:
+    metadata:
+      labels:
+        app: rest
+    spec:
+      volumes:
+      - name: host-volume
+        hostPath:
+          path: /home/docker
+      containers:
+      - image: rabashizade/helix-rest:1.1
+        name: helix-rest
+        volumeMounts:
+        - mountPath: /tmp
+          name: host-volume
+        command: ["/bin/sh", "-c", "/target/helix-rest-pkg/bin/run-rest-admin.sh --port 30000 --zkSvr helix-zookeeper-service-internal:2199 2>&1 > /tmp/rest.log"]
+---
+apiVersion: v1
+kind: Service
+metadata:
+  name: helix-rest-service
+spec:
+  type: NodePort
+  selector:
+    app: rest
+  ports:
+  - protocol: TCP
+    port: 30000
+    nodePort: 30000
diff --git a/demo/elastic-scaling/helix-rest.Dockerfile b/demo/elastic-scaling/helix-rest.Dockerfile
new file mode 100644
index 0000000..cc41fd8
--- /dev/null
+++ b/demo/elastic-scaling/helix-rest.Dockerfile
@@ -0,0 +1,3 @@
+FROM openjdk:8-jre-alpine3.9
+
+COPY ../../helix-rest/ ./
diff --git a/demo/elastic-scaling/helix-zookeeper-deployment.yaml b/demo/elastic-scaling/helix-zookeeper-deployment.yaml
new file mode 100644
index 0000000..ba29c49
--- /dev/null
+++ b/demo/elastic-scaling/helix-zookeeper-deployment.yaml
@@ -0,0 +1,53 @@
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  name: zookeeper-deployment
+  labels:
+    app: zookeeper-deployment
+spec:
+  replicas: 1
+  selector:
+    matchLabels:
+      app: zookeeper
+  template:
+    metadata:
+      labels:
+        app: zookeeper
+    spec:
+      volumes:
+      - name: host-volume
+        hostPath:
+          path: /home/docker
+      containers:
+      - image: rabashizade/helix-core:1.3
+        name: helix-zookeeper
+        volumeMounts:
+        - mountPath: /tmp
+          name: host-volume
+        command: ["/bin/sh", "-c", "/target/helix-core-pkg/bin/start-standalone-zookeeper.sh 2199 2>&1 > /tmp/zookeeper.log"]
+---
+apiVersion: v1
+kind: Service
+metadata:
+  name: helix-zookeeper-service-external
+spec:
+  type: NodePort
+  selector:
+    app: zookeeper
+  ports:
+  - protocol: TCP
+    port: 2199
+    targetPort: 2199
+    nodePort: 30100
+---
+apiVersion: v1
+kind: Service
+metadata:
+  name: helix-zookeeper-service-internal
+spec:
+  selector:
+    app: zookeeper
+  ports:
+  - protocol: TCP
+    port: 2199
+    targetPort: 2199
diff --git a/demo/elastic-scaling/hpa.yaml b/demo/elastic-scaling/hpa.yaml
new file mode 100644
index 0000000..65c50dd
--- /dev/null
+++ b/demo/elastic-scaling/hpa.yaml
@@ -0,0 +1,34 @@
+apiVersion: autoscaling/v2beta2
+kind: HorizontalPodAutoscaler
+metadata:
+  name: helix-participant-hpa
+  namespace: default
+spec:
+  scaleTargetRef:
+    apiVersion: apps/v1
+    kind: Deployment
+    name: helix-participant-deployment
+  minReplicas: 1
+  maxReplicas: 10
+  metrics:
+  - type: External
+    external:
+      metric:
+        name: jobs-number
+      target:
+        type: AverageValue
+        averageValue: 1
+  behavior:
+    scaleDown:
+      stabilizationWindowSeconds: 0
+      policies:
+      - type: Percent
+        value: 100
+        periodSeconds: 1
+    scaleUp:
+      stabilizationWindowSeconds: 0
+      policies:
+      - type: Pods
+        value: 10
+        periodSeconds: 1
+      selectPolicy: Max
diff --git a/demo/elastic-scaling/plot.py b/demo/elastic-scaling/plot.py
new file mode 100644
index 0000000..d3e24b1
--- /dev/null
+++ b/demo/elastic-scaling/plot.py
@@ -0,0 +1,35 @@
+import pandas as pd
+import matplotlib.pyplot as plt
+from matplotlib.animation import FuncAnimation
+import csv
+
+x_values = []
+w_values = []
+li_values = []
+
+with open("demo.csv", 'w') as csvfile:
+    csvwriter = csv.writer(csvfile)
+    csvwriter.writerow(['Time', 'Workflows', 'Live Instances'])
+
+fig, (ax1, ax2) = plt.subplots(2,1)
+def animate(self):
+    data = pd.read_csv('demo.csv')
+    x_values = data['Time']
+    w_values = data['Workflows']
+    li_values = data['Live Instances']
+    ax1.cla()
+    ax1.plot(x_values, w_values)
+    #ax1.xlabel('Time')
+    ax1.grid()
+    ax1.legend(['Workflows',], loc='upper left')
+    ax2.cla()
+    ax2.plot(x_values, li_values, color='red')
+    #ax2.xlabel('Time')
+    ax2.grid()
+    ax2.legend(['Live Instances'], loc='upper left')
+    plt.xlabel('Time (s)')
+
+ani = FuncAnimation(plt.gcf(), animate, 5000)
+
+plt.tight_layout()
+plt.show()
diff --git a/demo/elastic-scaling/testing-adapter.yaml b/demo/elastic-scaling/testing-adapter.yaml
new file mode 100644
index 0000000..16e0441
--- /dev/null
+++ b/demo/elastic-scaling/testing-adapter.yaml
@@ -0,0 +1,184 @@
+apiVersion: v1
+kind: Namespace
+metadata:
+  name: custom-metrics
+---
+kind: ServiceAccount
+apiVersion: v1
+metadata:
+  name: custom-metrics-apiserver
+  namespace: custom-metrics
+---
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRoleBinding
+metadata:
+  name: custom-metrics:system:auth-delegator
+roleRef:
+  apiGroup: rbac.authorization.k8s.io
+  kind: ClusterRole
+  name: system:auth-delegator
+subjects:
+- kind: ServiceAccount
+  name: custom-metrics-apiserver
+  namespace: custom-metrics
+---
+apiVersion: rbac.authorization.k8s.io/v1
+kind: RoleBinding
+metadata:
+  name: custom-metrics-auth-reader
+  namespace: kube-system
+roleRef:
+  apiGroup: rbac.authorization.k8s.io
+  kind: Role
+  name: extension-apiserver-authentication-reader
+subjects:
+- kind: ServiceAccount
+  name: custom-metrics-apiserver
+  namespace: custom-metrics
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  labels:
+    app: custom-metrics-apiserver
+  name: custom-metrics-apiserver
+  namespace: custom-metrics
+spec:
+  replicas: 1
+  selector:
+    matchLabels:
+      app: custom-metrics-apiserver
+  template:
+    metadata:
+      labels:
+        app: custom-metrics-apiserver
+      name: custom-metrics-apiserver
+    spec:
+      serviceAccountName: custom-metrics-apiserver
+      containers:
+      - name: custom-metrics-apiserver
+        image: rabashizade/k8s-test-metrics-adapter-amd64:latest
+        imagePullPolicy: IfNotPresent
+        ports:
+        - containerPort: 6443
+          name: https
+        - containerPort: 8080
+          name: http
+        volumeMounts:
+        - mountPath: /tmp
+          name: host-volume
+        command: ["/bin/sh", "-c", "/adapter --secure-port=6443 --logtostderr=true --v=10 2>&1 > /tmp/metrics-server.log"]
+      volumes:
+      - name: host-volume
+        hostPath:
+          path: /home/docker
+---
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRoleBinding
+metadata:
+  name: custom-metrics-resource-reader
+roleRef:
+  apiGroup: rbac.authorization.k8s.io
+  kind: ClusterRole
+  name: custom-metrics-resource-reader
+subjects:
+- kind: ServiceAccount
+  name: custom-metrics-apiserver
+  namespace: custom-metrics
+---
+apiVersion: v1
+kind: Service
+metadata:
+  name: custom-metrics-apiserver
+  namespace: custom-metrics
+spec:
+  ports:
+  - name: https
+    port: 443
+    targetPort: 6443
+  - name: http
+    port: 80
+    targetPort: 8080
+  selector:
+    app: custom-metrics-apiserver
+---
+apiVersion: apiregistration.k8s.io/v1beta1
+kind: APIService
+metadata:
+  name: v1beta1.custom.metrics.k8s.io
+spec:
+  service:
+    name: custom-metrics-apiserver
+    namespace: custom-metrics
+  group: custom.metrics.k8s.io
+  version: v1beta1
+  insecureSkipTLSVerify: true
+  groupPriorityMinimum: 100
+  versionPriority: 100
+---
+apiVersion: apiregistration.k8s.io/v1beta1
+kind: APIService
+metadata:
+  name: v1beta1.external.metrics.k8s.io
+spec:
+  service:
+    name: custom-metrics-apiserver
+    namespace: custom-metrics
+  group: external.metrics.k8s.io
+  version: v1beta1
+  insecureSkipTLSVerify: true
+  groupPriorityMinimum: 100
+  versionPriority: 100
+---
+apiVersion: apiregistration.k8s.io/v1beta1
+kind: APIService
+metadata:
+  name: v1beta2.custom.metrics.k8s.io
+spec:
+  service:
+    name: custom-metrics-apiserver
+    namespace: custom-metrics
+  group: custom.metrics.k8s.io
+  version: v1beta2
+  insecureSkipTLSVerify: true
+  groupPriorityMinimum: 100
+  versionPriority: 200
+---
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRole
+metadata:
+  name: custom-metrics-server-resources
+rules:
+- apiGroups:
+  - custom.metrics.k8s.io
+  - external.metrics.k8s.io
+  resources: ["*"]
+  verbs: ["*"]
+---
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRole
+metadata:
+  name: custom-metrics-resource-reader
+rules:
+- apiGroups:
+  - ""
+  resources:
+  - namespaces
+  - pods
+  - services
+  verbs:
+  - get
+  - list
+---
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRoleBinding
+metadata:
+  name: hpa-controller-custom-metrics
+roleRef:
+  apiGroup: rbac.authorization.k8s.io
+  kind: ClusterRole
+  name: custom-metrics-server-resources
+subjects:
+- kind: ServiceAccount
+  name: horizontal-pod-autoscaler
+  namespace: kube-system
diff --git a/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java b/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java
index fd73013..6d383e7 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java
@@ -20,6 +20,7 @@ package org.apache.helix.examples;
  */
 
 import java.io.File;
+import java.net.InetAddress;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -124,7 +125,7 @@ public class ExampleProcess {
     Option hostOption =
         OptionBuilder.withLongOpt(hostAddress).withDescription("Provide host name").create();
     hostOption.setArgs(1);
-    hostOption.setRequired(true);
+    hostOption.setRequired(false);
     hostOption.setArgName("Host name (Required)");
 
     Option portOption =
@@ -204,7 +205,7 @@ public class ExampleProcess {
       zkConnectString = cmd.getOptionValue(zkServer);
       clusterName = cmd.getOptionValue(cluster);
 
-      String host = cmd.getOptionValue(hostAddress);
+      String host = InetAddress.getLocalHost().getHostAddress();
       String portString = cmd.getOptionValue(hostPort);
       int port = Integer.parseInt(portString);
       instanceName = host + "_" + port;
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index 88e7f7b..a7b93fc 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -23,6 +23,7 @@ import java.lang.management.ManagementFactory;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -52,6 +53,9 @@ import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskStateModelFactory;
 import org.apache.helix.util.HelixUtil;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -118,6 +122,10 @@ public class ParticipantManager {
     _liveInstanceInfoProvider = liveInstanceInfoProvider;
     _preConnectCallbacks = preConnectCallbacks;
     _helixManagerProperty = helixManagerProperty;
+
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    _stateMachineEngine.registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
+        new TaskStateModelFactory(_manager, taskFactoryReg));
   }
 
   /**
@@ -168,6 +176,7 @@ public class ParticipantManager {
     try {
       HelixConfigScope scope = new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER)
           .forCluster(_manager.getClusterName()).build();
+      _configAccessor.set(scope, ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, "true");
       autoJoin = Boolean
           .parseBoolean(_configAccessor.get(scope, ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN));
       LOG.info("instance: " + _instanceName + " auto-joining " + _clusterName + " is " + autoJoin);
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestDemoDynamicTaskLoading.java b/helix-core/src/test/java/org/apache/helix/task/TestDemoDynamicTaskLoading.java
index 66100ad..dd74817 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestDemoDynamicTaskLoading.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestDemoDynamicTaskLoading.java
@@ -118,7 +118,7 @@ public class TestDemoDynamicTaskLoading extends ZkTestBase {
     submitWorkflow(workflowName, driver);
 
     // Wait for the workflow to either complete or fail.
-    Thread.sleep(100000);
+    //Thread.sleep(100000);
     TaskState finalState =
         driver.pollForWorkflowState(workflowName, TaskState.COMPLETED, TaskState.FAILED);
     Assert.assertEquals(finalState, TaskState.COMPLETED);
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestElasticScaling.java b/helix-core/src/test/java/org/apache/helix/task/TestElasticScaling.java
new file mode 100644
index 0000000..34b381f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestElasticScaling.java
@@ -0,0 +1,145 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.model.ClusterConfig;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestElasticScaling extends TaskSynchronizedTestBase {
+  private static final String DEFAULT_QUOTA_TYPE = "DEFAULT";
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    setSingleTestEnvironment();
+    super.beforeClass();
+  }
+
+  private void sleep(int time) {
+    long sleepStartTime = System.currentTimeMillis();
+    long sleepCurrentTime = sleepStartTime;
+    while (sleepCurrentTime - sleepStartTime < time) {
+      sleepCurrentTime = System.currentTimeMillis();
+    }
+  }
+
+  private void monitorZK(HelixManager tmpManager, TaskDriver driver, int testTime) {
+    try {
+      String traceFilePath = "../demo/elastic-scaling/demo.csv";
+      FileWriter file = new FileWriter(traceFilePath, true);
+      int time = 0;
+      long startTime = System.currentTimeMillis();
+      long currentTime = startTime;
+      while (currentTime - startTime < testTime) {
+        sleep(1000);
+        int workflows = driver.getWorkflows().size();
+        int liveInstances = tmpManager.getHelixDataAccessor()
+            .getChildNames(tmpManager.getHelixDataAccessor().keyBuilder().liveInstances()).size();
+        file.write(time + "," + workflows + "," + liveInstances + "\n");
+        file.flush();
+        ++time;
+        currentTime = System.currentTimeMillis();
+      }
+      file.close();
+    } catch (IOException e) {
+      throw new IllegalStateException(e.getMessage());
+    }
+  }
+
+  private void submitTestWorkflow(String workflowName, TaskDriver driver, String jobDelay) {
+    JobConfig.Builder job = new JobConfig.Builder();
+    job.setJobCommandConfigMap(
+        Collections.singletonMap(org.apache.helix.integration.task.MockTask.JOB_DELAY, jobDelay));
+    Workflow.Builder workflow = new Workflow.Builder(workflowName);
+    workflow.setExpiry(1);
+    job.setWorkflow(workflowName);
+    TaskConfig taskConfig =
+        new TaskConfig(MockTask.TASK_COMMAND, new HashMap<String, String>(), null, null);
+    job.addTaskConfigMap(Collections.singletonMap(taskConfig.getId(), taskConfig));
+    job.setJobId(TaskUtil.getNamespacedJobName(workflowName, "JOB"));
+    workflow.addJob("JOB", job);
+    driver.start(workflow.build());
+  }
+
+  @Test
+  public void testElasticScaling() throws Exception {
+    String clusterName = "MYCLUSTER";
+    String zkAddr = "172.17.0.3:30100";
+    String taskCommand = "Reindex";
+    String taskJarPath = "src/test/resources/Reindex.jar";
+    String taskVersion = "1.0.0";
+    String fullyQualifiedTaskClassName = "com.mycompany.mocktask.MockTask";
+    String fullyQualifiedTaskFactoryClassName = "com.mycompany.mocktask.MockTaskFactory";
+
+    HelixManager tmpManager = HelixManagerFactory.getZKHelixManager(clusterName, "Admin", InstanceType.ADMINISTRATOR, zkAddr);
+    tmpManager.connect();
+
+    // Add task definition information as a DynamicTaskConfig.
+    List<String> taskClasses = new ArrayList();
+    taskClasses.add(fullyQualifiedTaskClassName);
+    DynamicTaskConfig taskConfig = new DynamicTaskConfig(taskCommand, taskJarPath, taskVersion, taskClasses, fullyQualifiedTaskFactoryClassName);
+    String path = TaskConstants.DYNAMICALLY_LOADED_TASK_PATH + "/" + taskCommand;
+    tmpManager.getHelixDataAccessor().getBaseDataAccessor().create(path, taskConfig.getTaskConfigZNRecord(), AccessOption.PERSISTENT);
+
+    // Set Default quota to 1 to easier see autoscaling happening
+    ClusterConfig clusterConfig = tmpManager.getConfigAccessor().getClusterConfig(clusterName);
+    clusterConfig.resetTaskQuotaRatioMap();
+    clusterConfig.setTaskQuotaRatio(DEFAULT_QUOTA_TYPE,1);
+    clusterConfig.setTaskQuotaRatio("A", 39);
+
+    // Launch another thread to monitor the cluster
+    tmpManager.getConfigAccessor().setClusterConfig(clusterName, clusterConfig);
+    TaskDriver driver = new TaskDriver(tmpManager);
+    ExecutorService service = Executors.newFixedThreadPool(1);
+    CountDownLatch latch = new CountDownLatch(1);
+    service.execute(() -> {
+      monitorZK(tmpManager, driver, 200000);
+      latch.countDown();
+    });
+
+    // Submit workflows for the demo
+    sleep(1000);
+    for (int i = 0; i < 15; i++) {
+      submitTestWorkflow("Workflow" + i, driver, "15000");
+    }
+    sleep(80000);
+    for (int i = 15; i < 30; i++) {
+      submitTestWorkflow("Workflow" + i, driver, "15000");
+    }
+    sleep(80000);
+    latch.await();
+    tmpManager.disconnect();
+  }
+}