You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2020/04/28 10:13:55 UTC
[druid] branch master updated: Adding support for autoscaling in
GCE (#8987)
This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new e7e41e3 Adding support for autoscaling in GCE (#8987)
e7e41e3 is described below
commit e7e41e3a369c236b444dc44338c443c2e46904f1
Author: Francesco Nidito <11...@users.noreply.github.com>
AuthorDate: Tue Apr 28 12:13:39 2020 +0200
Adding support for autoscaling in GCE (#8987)
* Adding support for autoscaling in GCE
* adding extra google deps also in gce pom
* fix link in doc
* remove unused deps
* adding terms to spelling file
* version in pom 0.17.0-incubating-SNAPSHOT --> 0.18.0-SNAPSHOT
* GCEXyz -> GceXyz in naming for consistency
* add preconditions
* add VisibleForTesting annotation
* typos in comments
* use StringUtils.format instead of String.format
* use custom exception instead of exit
* factorize interval time between retries
* making literal value a constant
* iter all network interfaces
* use provided on google (non api) deps
* adding missing dep
* removing unneded this and use Objects methods instead o 3-way if in hash and comparison
* adding import
* adding retries around getRunningInstances and adding limit for operation end waiting
* refactor GceEnvironmentConfig.hashCode
* 0.18.0-SNAPSHOT -> 0.19.0-SNAPSHOT
* removing unused config
* adding tests to hash and equals
* adding nullable to waitForOperationEnd
* adding testTerminate
* adding unit tests for createComputeService
* increasing retries in unrelated integration-test to prevent sporadic failure (hopefully)
* reverting queryResponseTemplate change
* adding comment for Compute.Builder.build() returning null
---
distribution/pom.xml | 2 +
docs/configuration/index.md | 8 +-
.../extensions-contrib/gce-extensions.md | 103 +++
docs/development/extensions.md | 1 +
extensions-contrib/gce-extensions/pom.xml | 131 ++++
.../overlord/autoscaling/gce/GceAutoScaler.java | 526 +++++++++++++
.../autoscaling/gce/GceEnvironmentConfig.java | 132 ++++
.../overlord/autoscaling/gce/GceModule.java | 42 +
.../autoscaling/gce/GceServiceException.java | 32 +
.../overlord/autoscaling/gce/GceUtils.java | 73 ++
.../org.apache.druid.initialization.DruidModule | 16 +
.../autoscaling/gce/GceAutoScalerTest.java | 853 +++++++++++++++++++++
.../overlord/autoscaling/gce/GceUtilsTest.java | 96 +++
extensions-core/google-extensions/pom.xml | 2 +-
licenses.yaml | 16 +-
pom.xml | 12 +-
.../druid/server/initialization/JettyTest.java | 2 +-
website/.spelling | 6 +
website/i18n/en.json | 3 +
website/sidebars.json | 1 +
20 files changed, 2048 insertions(+), 9 deletions(-)
diff --git a/distribution/pom.xml b/distribution/pom.xml
index a47ead3..2b5c3d5 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -423,6 +423,8 @@
<argument>org.apache.druid.extensions.contrib:druid-moving-average-query</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:druid-tdigestsketch</argument>
+ <argument>-c</argument>
+ <argument>org.apache.druid.extensions.contrib:gce-extensions</argument>
</arguments>
</configuration>
</execution>
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 982ff5e..e17a83a 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -891,7 +891,7 @@ There are additional configs for autoscaling (if it is enabled):
|Property|Description|Default|
|--------|-----------|-------|
-|`druid.indexer.autoscale.strategy`|Choices are "noop" or "ec2". Sets the strategy to run when autoscaling is required.|noop|
+|`druid.indexer.autoscale.strategy`|Choices are "noop", "ec2" or "gce". Sets the strategy to run when autoscaling is required.|noop|
|`druid.indexer.autoscale.doAutoscale`|If set to "true" autoscaling will be enabled.|false|
|`druid.indexer.autoscale.provisionPeriod`|How often to check whether or not new MiddleManagers should be added.|PT1M|
|`druid.indexer.autoscale.terminatePeriod`|How often to check when MiddleManagers should be removed.|PT5M|
@@ -1115,7 +1115,9 @@ field. If not provided, the default is to not use it at all.
##### Autoscaler
-Amazon's EC2 is currently the only supported autoscaler.
+Amazon's EC2 together with Google's GCE are currently the only supported autoscalers.
+
+EC2's autoscaler properties are:
|Property|Description|Default|
|--------|-----------|-------|
@@ -1125,6 +1127,8 @@ Amazon's EC2 is currently the only supported autoscaler.
|`nodeData`|A JSON object that describes how to launch new nodes.|none; required|
|`userData`|A JSON object that describes how to configure new nodes. If you have set druid.indexer.autoscale.workerVersion, this must have a versionReplacementString. Otherwise, a versionReplacementString is not necessary.|none; optional|
+For GCE's properties, please refer to the [gce-extensions](../development/extensions-contrib/gce-extensions.md).
+
## Data Server
This section contains the configuration options for the processes that reside on Data servers (MiddleManagers/Peons and Historicals) in the suggested [three-server configuration](../design/processes.html#server-types).
diff --git a/docs/development/extensions-contrib/gce-extensions.md b/docs/development/extensions-contrib/gce-extensions.md
new file mode 100644
index 0000000..687f6bb
--- /dev/null
+++ b/docs/development/extensions-contrib/gce-extensions.md
@@ -0,0 +1,103 @@
+---
+id: gce-extensions
+title: "GCE Extensions"
+---
+
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+
+To use this Apache Druid (incubating) extension, make sure to [include](../../development/extensions.md#loading-extensions) `gce-extensions`.
+
+At the moment, this extension enables only Druid to autoscale instances in GCE.
+
+The extension manages the instances to be scaled up and down through the use of the [Managed Instance Groups](https://cloud.google.com/compute/docs/instance-groups/creating-groups-of-managed-instances#resize_managed_group)
+of GCE (MIG from now on). This choice has been made to ease the configuration of the machines and simplify their
+management.
+
+For this reason, in order to use this extension, the user must have created
+1. An instance template with the right machine type and image to bu used to run the MiddleManager
+2. A MIG that has been configured to use the instance template created in the point above
+
+Moreover, in order to be able to rescale the machines in the MIG, the Overlord must run with a service account
+guaranteeing the following two scopes from the [Compute Engine API](https://developers.google.com/identity/protocols/googlescopes#computev1)
+- `https://www.googleapis.com/auth/cloud-platform`
+- `https://www.googleapis.com/auth/compute`
+
+## Overlord Dynamic Configuration
+
+The Overlord can dynamically change worker behavior.
+
+The JSON object can be submitted to the Overlord via a POST request at:
+
+```
+http://<OVERLORD_IP>:<port>/druid/indexer/v1/worker
+```
+
+Optional Header Parameters for auditing the config change can also be specified.
+
+|Header Param Name| Description | Default |
+|----------|-------------|---------|
+|`X-Druid-Author`| author making the config change|""|
+|`X-Druid-Comment`| comment describing the change being done|""|
+
+A sample worker config spec is shown below:
+
+```json
+{
+ "autoScaler": {
+ "envConfig" : {
+ "numInstances" : 1,
+ "projectId" : "super-project",
+ "zoneName" : "us-central-1",
+ "managedInstanceGroupName" : "druid-middlemanagers"
+ },
+ "maxNumWorkers" : 4,
+ "minNumWorkers" : 2,
+ "type" : "gce"
+ }
+}
+```
+
+The configuration of the autoscaler is quite simple and it is made of two levels only.
+
+The external level specifies the `type`—always `gce` in this case— and two numeric values,
+the `maxNumWorkers` and `minNumWorkers` used to define the boundaries in between which the
+number of instances must be at any time.
+
+The internal level is the `envConfig` and it is used to specify
+
+- The `numInstances` used to specify how many workers will be spawned at each
+request to provision more workers. This is safe to be left to `1`
+- The `projectId` used to specify the name of the project in which the MIG resides
+- The `zoneName` used to identify in which zone of the worlds the MIG is
+- The `managedInstanceGroupName` used to specify the MIG containing the instances created or
+removed
+
+Please refer to the Overlord Dynamic Configuration section in the main [documentation](../../configuration/index.md)
+for parameters other than the ones specified here, such as `selectStrategy` etc.
+
+## Known limitations
+
+- The module internally uses the [ListManagedInstances](https://cloud.google.com/compute/docs/reference/rest/v1/instanceGroupManagers/listManagedInstances)
+ call from the API and, while the documentation of the API states that the call can be paged through using the
+ `pageToken` argument, the responses to such call do not provide any `nextPageToken` to set such parameter. This means
+ that the extension can operate safely with a maximum of 500 MiddleManagers instances at any time (the maximum number
+ of instances to be returned for each call).
+
\ No newline at end of file
diff --git a/docs/development/extensions.md b/docs/development/extensions.md
index 54a50ff..574f67d 100644
--- a/docs/development/extensions.md
+++ b/docs/development/extensions.md
@@ -91,6 +91,7 @@ All of these community extensions can be downloaded using [pull-deps](../operati
|druid-influxdb-emitter|InfluxDB metrics emitter|[link](../development/extensions-contrib/influxdb-emitter.md)|
|druid-momentsketch|Support for approximate quantile queries using the [momentsketch](https://github.com/stanford-futuredata/momentsketch) library|[link](../development/extensions-contrib/momentsketch-quantiles.md)|
|druid-tdigestsketch|Support for approximate sketch aggregators based on [T-Digest](https://github.com/tdunning/t-digest)|[link](../development/extensions-contrib/tdigestsketch-quantiles.md)|
+|gce-extensions|GCE Extensions|[link](../development/extensions-contrib/gce-extensions.md)|
## Promoting community extensions to core extensions
diff --git a/extensions-contrib/gce-extensions/pom.xml b/extensions-contrib/gce-extensions/pom.xml
new file mode 100644
index 0000000..87a0a98
--- /dev/null
+++ b/extensions-contrib/gce-extensions/pom.xml
@@ -0,0 +1,131 @@
+<?xml version="1.0"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <parent>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid</artifactId>
+ <version>0.19.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.druid.extensions.contrib</groupId>
+ <artifactId>gce-extensions</artifactId>
+ <name>gce-extensions</name>
+ <description>Extension to support the autoscaling in GCE</description>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid-core</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid-indexing-service</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid-aws-common</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid-processing</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.inject</groupId>
+ <artifactId>guice</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.apis</groupId>
+ <artifactId>google-api-services-compute</artifactId>
+ <version>v1-rev214-1.25.0</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.http-client</groupId>
+ <artifactId>google-http-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.http-client</groupId>
+ <artifactId>google-http-client-jackson2</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.api-client</groupId>
+ <artifactId>google-api-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <!-- Tests -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>nl.jqno.equalsverifier</groupId>
+ <artifactId>equalsverifier</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceAutoScaler.java b/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceAutoScaler.java
new file mode 100644
index 0000000..3c8f529
--- /dev/null
+++ b/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceAutoScaler.java
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.autoscaling.gce;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
+import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.services.compute.Compute;
+import com.google.api.services.compute.ComputeScopes;
+import com.google.api.services.compute.model.Instance;
+import com.google.api.services.compute.model.InstanceGroupManagersDeleteInstancesRequest;
+import com.google.api.services.compute.model.InstanceGroupManagersListManagedInstancesResponse;
+import com.google.api.services.compute.model.InstanceList;
+import com.google.api.services.compute.model.ManagedInstance;
+import com.google.api.services.compute.model.NetworkInterface;
+import com.google.api.services.compute.model.Operation;
+import com.google.common.base.Preconditions;
+import com.google.common.net.InetAddresses;
+import org.apache.druid.indexing.overlord.autoscaling.AutoScaler;
+import org.apache.druid.indexing.overlord.autoscaling.AutoScalingData;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * This module permits the autoscaling of the workers in GCE
+ *
+ * General notes:
+ * - The IPs are IPs as in Internet Protocol, and they look like 1.2.3.4
+ * - The IDs are the names of the instances of instances created, they look like prefix-abcd,
+ * where the prefix is chosen by you and abcd is a suffix assigned by GCE
+ */
+@JsonTypeName("gce")
+public class GceAutoScaler implements AutoScaler<GceEnvironmentConfig>
+{
+ private static final EmittingLogger log = new EmittingLogger(GceAutoScaler.class);
+
+ private final GceEnvironmentConfig envConfig;
+ private final int minNumWorkers;
+ private final int maxNumWorkers;
+
+ private Compute cachedComputeService = null;
+
+ private static final long POLL_INTERVAL_MS = 5 * 1000; // 5 sec
+ private static final int RUNNING_INSTANCES_MAX_RETRIES = 10;
+ private static final int OPERATION_END_MAX_RETRIES = 10;
+
+ @JsonCreator
+ public GceAutoScaler(
+ @JsonProperty("minNumWorkers") int minNumWorkers,
+ @JsonProperty("maxNumWorkers") int maxNumWorkers,
+ @JsonProperty("envConfig") GceEnvironmentConfig envConfig
+ )
+ {
+ Preconditions.checkArgument(minNumWorkers > 0,
+ "minNumWorkers must be greater than 0");
+ this.minNumWorkers = minNumWorkers;
+ Preconditions.checkArgument(maxNumWorkers > 0,
+ "maxNumWorkers must be greater than 0");
+ Preconditions.checkArgument(maxNumWorkers > minNumWorkers,
+ "maxNumWorkers must be greater than minNumWorkers");
+ this.maxNumWorkers = maxNumWorkers;
+ this.envConfig = envConfig;
+ }
+
+ @Override
+ @JsonProperty
+ public int getMinNumWorkers()
+ {
+ return minNumWorkers;
+ }
+
+ @Override
+ @JsonProperty
+ public int getMaxNumWorkers()
+ {
+ return maxNumWorkers;
+ }
+
+ @Override
+ @JsonProperty
+ public GceEnvironmentConfig getEnvConfig()
+ {
+ return envConfig;
+ }
+
+ @Nullable
+ Compute createComputeServiceImpl()
+ throws IOException, GeneralSecurityException, GceServiceException
+ {
+ HttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport();
+ JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();
+ GoogleCredential credential = GoogleCredential.getApplicationDefault(
+ httpTransport,
+ jsonFactory
+ );
+ if (credential.createScopedRequired()) {
+ List<String> scopes = new ArrayList<>();
+ scopes.add(ComputeScopes.CLOUD_PLATFORM);
+ scopes.add(ComputeScopes.COMPUTE);
+ credential = credential.createScoped(scopes);
+ }
+
+ if (credential.getClientAuthentication() != null) {
+ throw new GceServiceException("Not using a service account");
+ }
+
+ return new Compute.Builder(httpTransport, jsonFactory, credential)
+ .setApplicationName("DruidAutoscaler")
+ .build();
+ }
+
+ private synchronized Compute createComputeService()
+ throws IOException, GeneralSecurityException, InterruptedException, GceServiceException
+ {
+ final int maxRetries = 5;
+
+ int retries = 0;
+ // This retry loop is here to catch the cases in which the underlying call to
+ // Compute.Builder(...).build() returns null, case that has been experienced
+ // sporadically at start time
+ while (cachedComputeService == null && retries < maxRetries) {
+ if (retries > 0) {
+ Thread.sleep(POLL_INTERVAL_MS);
+ }
+
+ log.info("Creating new ComputeService [%d/%d]", retries + 1, maxRetries);
+
+ try {
+ cachedComputeService = createComputeServiceImpl();
+ retries++;
+ }
+ catch (Throwable e) {
+ log.error(e, "Got Exception in creating the ComputeService");
+ throw e;
+ }
+ }
+ return cachedComputeService;
+ }
+
+ // Used to wait for an operation to finish
+ @Nullable
+ private Operation.Error waitForOperationEnd(
+ Compute compute,
+ Operation operation) throws Exception
+ {
+ String status = operation.getStatus();
+ String opId = operation.getName();
+ for (int i = 0; i < OPERATION_END_MAX_RETRIES; i++) {
+ if (operation == null || "DONE".equals(status)) {
+ return operation == null ? null : operation.getError();
+ }
+ log.info("Waiting for operation %s to end", opId);
+ Thread.sleep(POLL_INTERVAL_MS);
+ Compute.ZoneOperations.Get get = compute.zoneOperations().get(
+ envConfig.getProjectId(),
+ envConfig.getZoneName(),
+ opId
+ );
+ operation = get.execute();
+ if (operation != null) {
+ status = operation.getStatus();
+ }
+ }
+ throw new InterruptedException(
+ StringUtils.format("Timed out waiting for operation %s to complete", opId)
+ );
+ }
+
+ /**
+ * When called resizes envConfig.getManagedInstanceGroupName() increasing it by creating
+ * envConfig.getNumInstances() new workers (unless the maximum is reached). Return the
+ * IDs of the workers created
+ */
+ @Override
+ public AutoScalingData provision()
+ {
+ final String project = envConfig.getProjectId();
+ final String zone = envConfig.getZoneName();
+ final int numInstances = envConfig.getNumInstances();
+ final String managedInstanceGroupName = envConfig.getManagedInstanceGroupName();
+
+ try {
+ List<String> before = getRunningInstances();
+ log.debug("Existing instances [%s]", String.join(",", before));
+
+ int toSize = Math.min(before.size() + numInstances, getMaxNumWorkers());
+ if (before.size() >= toSize) {
+ // nothing to scale
+ return new AutoScalingData(new ArrayList<>());
+ }
+ log.info("Asked to provision instances, will resize to %d", toSize);
+
+ Compute computeService = createComputeService();
+ Compute.InstanceGroupManagers.Resize request =
+ computeService.instanceGroupManagers().resize(project, zone,
+ managedInstanceGroupName, toSize);
+
+ Operation response = request.execute();
+ Operation.Error err = waitForOperationEnd(computeService, response);
+ if (err == null || err.isEmpty()) {
+ List<String> after = null;
+ // as the waitForOperationEnd only waits for the operation to be scheduled
+ // this loop waits until the requested machines actually go up (or up to a
+ // certain amount of retries in checking)
+ for (int i = 0; i < RUNNING_INSTANCES_MAX_RETRIES; i++) {
+ after = getRunningInstances();
+ if (after.size() == toSize) {
+ break;
+ }
+ log.info("Machines not up yet, waiting");
+ Thread.sleep(POLL_INTERVAL_MS);
+ }
+ after.removeAll(before); // these should be the new ones
+ log.info("Added instances [%s]", String.join(",", after));
+ return new AutoScalingData(after);
+ } else {
+ log.error("Unable to provision instances: %s", err.toPrettyString());
+ }
+ }
+ catch (Exception e) {
+ log.error(e, "Unable to provision any gce instances.");
+ }
+
+ return new AutoScalingData(new ArrayList<>());
+ }
+
+ /**
+ * Terminates the instances in the list of IPs provided by the caller
+ */
+ @Override
+ public AutoScalingData terminate(List<String> ips)
+ {
+ log.info("Asked to terminate: [%s]", String.join(",", ips));
+
+ if (ips.isEmpty()) {
+ return new AutoScalingData(new ArrayList<>());
+ }
+
+ List<String> nodeIds = ipToIdLookup(ips); // if they are not IPs, they will be unchanged
+ try {
+ return terminateWithIds(nodeIds != null ? nodeIds : new ArrayList<>());
+ }
+ catch (Exception e) {
+ log.error(e, "Unable to terminate any instances.");
+ }
+
+ return new AutoScalingData(new ArrayList<>());
+ }
+
+ private List<String> namesToInstances(List<String> names)
+ {
+ List<String> instances = new ArrayList<>();
+ for (String name : names) {
+ instances.add(
+ // convert the name into a URL's path to be used in calls to the API
+ StringUtils.format("zones/%s/instances/%s", envConfig.getZoneName(), name)
+ );
+ }
+ return instances;
+ }
+
+ /**
+ * Terminates the instances in the list of IDs provided by the caller
+ */
+ @Override
+ public AutoScalingData terminateWithIds(List<String> ids)
+ {
+ log.info("Asked to terminate IDs: [%s]", String.join(",", ids));
+
+ if (ids.isEmpty()) {
+ return new AutoScalingData(new ArrayList<>());
+ }
+
+ try {
+ final String project = envConfig.getProjectId();
+ final String zone = envConfig.getZoneName();
+ final String managedInstanceGroupName = envConfig.getManagedInstanceGroupName();
+
+ List<String> before = getRunningInstances();
+
+ InstanceGroupManagersDeleteInstancesRequest requestBody =
+ new InstanceGroupManagersDeleteInstancesRequest();
+ requestBody.setInstances(namesToInstances(ids));
+
+ Compute computeService = createComputeService();
+ Compute.InstanceGroupManagers.DeleteInstances request =
+ computeService
+ .instanceGroupManagers()
+ .deleteInstances(project, zone, managedInstanceGroupName, requestBody);
+
+ Operation response = request.execute();
+ Operation.Error err = waitForOperationEnd(computeService, response);
+ if (err == null || err.isEmpty()) {
+ List<String> after = null;
+ // as the waitForOperationEnd only waits for the operation to be scheduled
+ // this loop waits until the requested machines actually go down (or up to a
+ // certain amount of retries in checking)
+ for (int i = 0; i < RUNNING_INSTANCES_MAX_RETRIES; i++) {
+ after = getRunningInstances();
+ if (after.size() == (before.size() - ids.size())) {
+ break;
+ }
+ log.info("Machines not down yet, waiting");
+ Thread.sleep(POLL_INTERVAL_MS);
+ }
+ before.removeAll(after); // keep only the ones no more present
+ return new AutoScalingData(before);
+ } else {
+ log.error("Unable to terminate instances: %s", err.toPrettyString());
+ }
+ }
+ catch (Exception e) {
+ log.error(e, "Unable to terminate any instances.");
+ }
+
+ return new AutoScalingData(new ArrayList<>());
+ }
+
+ // Returns the list of the IDs of the machines running in the MIG
+ private List<String> getRunningInstances()
+ {
+ final long maxResults = 500L; // 500 is sadly the max, see below
+
+ ArrayList<String> ids = new ArrayList<>();
+ try {
+ final String project = envConfig.getProjectId();
+ final String zone = envConfig.getZoneName();
+ final String managedInstanceGroupName = envConfig.getManagedInstanceGroupName();
+
+ Compute computeService = createComputeService();
+ Compute.InstanceGroupManagers.ListManagedInstances request =
+ computeService
+ .instanceGroupManagers()
+ .listManagedInstances(project, zone, managedInstanceGroupName);
+ // Notice that while the doc says otherwise, there is not nextPageToken to page
+ // through results and so everything needs to be in the same page
+ request.setMaxResults(maxResults);
+ InstanceGroupManagersListManagedInstancesResponse response = request.execute();
+ for (ManagedInstance mi : response.getManagedInstances()) {
+ ids.add(GceUtils.extractNameFromInstance(mi.getInstance()));
+ }
+ log.debug("Found running instances [%s]", String.join(",", ids));
+ }
+ catch (Exception e) {
+ log.error(e, "Unable to get instances.");
+ }
+ return ids;
+ }
+
+ /**
+ * Converts the IPs to IDs
+ */
+ @Override
+ public List<String> ipToIdLookup(List<String> ips)
+ {
+ log.info("Asked IPs -> IDs for: [%s]", String.join(",", ips));
+
+ if (ips.isEmpty()) {
+ return new ArrayList<>();
+ }
+
+ // If the first one is not an IP, just assume all the other ones are not as well and just
+ // return them as they are. This check is here because Druid does not check if IPs are
+ // actually IPs and can send IDs to this function instead
+ if (!InetAddresses.isInetAddress(ips.get(0))) {
+ log.debug("Not IPs, doing nothing");
+ return ips;
+ }
+
+ final String project = envConfig.getProjectId();
+ final String zone = envConfig.getZoneName();
+ try {
+ Compute computeService = createComputeService();
+ Compute.Instances.List request = computeService.instances().list(project, zone);
+ // Cannot filter by IP atm, see below
+ // request.setFilter(GceUtils.buildFilter(ips, "networkInterfaces[0].networkIP"));
+
+ List<String> instanceIds = new ArrayList<>();
+ InstanceList response;
+ do {
+ response = request.execute();
+ if (response.getItems() == null) {
+ continue;
+ }
+ for (Instance instance : response.getItems()) {
+ // This stupid look up is needed because atm it is not possible to filter
+ // by IP, see https://issuetracker.google.com/issues/73455339
+ for (NetworkInterface ni : instance.getNetworkInterfaces()) {
+ if (ips.contains(ni.getNetworkIP())) {
+ instanceIds.add(instance.getName());
+ }
+ }
+ }
+ request.setPageToken(response.getNextPageToken());
+ } while (response.getNextPageToken() != null);
+
+ log.debug("Converted to [%s]", String.join(",", instanceIds));
+ return instanceIds;
+ }
+ catch (Exception e) {
+ log.error(e, "Unable to convert IPs to IDs.");
+ }
+
+ return new ArrayList<>();
+ }
+
+ /**
+ * Converts the IDs to IPs - this is actually never called from the outside but it is called once
+ * from inside the class if terminate is used instead of terminateWithIds
+ */
+ @Override
+ public List<String> idToIpLookup(List<String> nodeIds)
+ {
+ log.info("Asked IDs -> IPs for: [%s]", String.join(",", nodeIds));
+
+ if (nodeIds.isEmpty()) {
+ return new ArrayList<>();
+ }
+
+ final String project = envConfig.getProjectId();
+ final String zone = envConfig.getZoneName();
+
+ try {
+ Compute computeService = createComputeService();
+ Compute.Instances.List request = computeService.instances().list(project, zone);
+ request.setFilter(GceUtils.buildFilter(nodeIds, "name"));
+
+ List<String> instanceIps = new ArrayList<>();
+ InstanceList response;
+ do {
+ response = request.execute();
+ if (response.getItems() == null) {
+ continue;
+ }
+ for (Instance instance : response.getItems()) {
+ // Assuming that every server has at least one network interface...
+ String ip = instance.getNetworkInterfaces().get(0).getNetworkIP();
+ // ...even though some IPs are reported as null on the spot but later they are ok,
+ // so we skip the ones that are null. fear not, they are picked up later this just
+ // prevents to have a machine called 'null' around which makes the caller wait for
+ // it for maxScalingDuration time before doing anything else
+ if (ip != null && !"null".equals(ip)) {
+ instanceIps.add(ip);
+ } else {
+ // log and skip it
+ log.warn("Call returned null IP for %s, skipping", instance.getName());
+ }
+ }
+ request.setPageToken(response.getNextPageToken());
+ } while (response.getNextPageToken() != null);
+
+ return instanceIps;
+ }
+ catch (Exception e) {
+ log.error(e, "Unable to convert IDs to IPs.");
+ }
+
+ return new ArrayList<>();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "gceAutoScaler={" +
+ "envConfig=" + envConfig +
+ ", maxNumWorkers=" + maxNumWorkers +
+ ", minNumWorkers=" + minNumWorkers +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ GceAutoScaler that = (GceAutoScaler) o;
+
+ return Objects.equals(envConfig, that.envConfig) &&
+ minNumWorkers == that.minNumWorkers &&
+ maxNumWorkers == that.maxNumWorkers;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = 0;
+ result = 31 * result + Objects.hashCode(envConfig);
+ result = 31 * result + minNumWorkers;
+ result = 31 * result + maxNumWorkers;
+ return result;
+ }
+}
diff --git a/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceEnvironmentConfig.java b/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceEnvironmentConfig.java
new file mode 100644
index 0000000..2bc9df3
--- /dev/null
+++ b/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceEnvironmentConfig.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.autoscaling.gce;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+import java.util.Objects;
+
+/**
+ */
+public class GceEnvironmentConfig
+{
+ /**
+ * numInstances: the number of workers to try to spawn at each call to provision
+ * projectId: the id of the project where to operate
+ * zoneName: the name of the zone where to operata
+ * instanceTemplate: the template to use when creating the instances
+ * minworkers: the minimum number of workers in the pool (*)
+ * maxWorkers: the maximum number of workers in the pool (*)
+ *
+ * (*) both used by the caller of the AutoScaler to know if it makes sense to call
+ * provision / terminate or if there is no hope that something would be done
+ */
+ private final int numInstances;
+ private final String projectId;
+ private final String zoneName;
+ private final String managedInstanceGroupName;
+
+ @JsonCreator
+ public GceEnvironmentConfig(
+ @JsonProperty("numInstances") int numInstances,
+ @JsonProperty("projectId") String projectId,
+ @JsonProperty("zoneName") String zoneName,
+ @JsonProperty("managedInstanceGroupName") String managedInstanceGroupName
+ )
+ {
+ Preconditions.checkArgument(numInstances > 0,
+ "numInstances must be greater than 0");
+ this.numInstances = numInstances;
+ this.projectId = Preconditions.checkNotNull(projectId,
+ "projectId must be not null");
+ this.zoneName = Preconditions.checkNotNull(zoneName,
+ "zoneName nust be not null");
+ this.managedInstanceGroupName = Preconditions.checkNotNull(
+ managedInstanceGroupName,
+ "managedInstanceGroupName must be not null"
+ );
+ }
+
+ @JsonProperty
+ public int getNumInstances()
+ {
+ return numInstances;
+ }
+
+
+ @JsonProperty
+ String getZoneName()
+ {
+ return zoneName;
+ }
+
+ @JsonProperty
+ String getProjectId()
+ {
+ return projectId;
+ }
+
+ @JsonProperty
+ String getManagedInstanceGroupName()
+ {
+ return managedInstanceGroupName;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "GceEnvironmentConfig={" +
+ "projectId=" + projectId +
+ ", zoneName=" + zoneName +
+ ", numInstances=" + numInstances +
+ ", managedInstanceGroupName=" + managedInstanceGroupName +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ GceEnvironmentConfig that = (GceEnvironmentConfig) o;
+ return (numInstances == that.numInstances &&
+ projectId.equals(that.projectId) &&
+ zoneName.equals(that.zoneName) &&
+ managedInstanceGroupName.equals(that.managedInstanceGroupName));
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = 0;
+ result = 31 * result + Objects.hashCode(projectId);
+ result = 31 * result + Objects.hashCode(zoneName);
+ result = 31 * result + Objects.hashCode(managedInstanceGroupName);
+ result = 31 * result + numInstances;
+ return result;
+ }
+}
diff --git a/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceModule.java b/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceModule.java
new file mode 100644
index 0000000..ce1d1c1
--- /dev/null
+++ b/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceModule.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.autoscaling.gce;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.inject.Binder;
+import org.apache.druid.initialization.DruidModule;
+
+import java.util.Collections;
+import java.util.List;
+
+public class GceModule implements DruidModule
+{
+ @Override
+ public List<? extends Module> getJacksonModules()
+ {
+ return Collections.singletonList(new SimpleModule("DruidGCEModule").registerSubtypes(GceAutoScaler.class));
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+ }
+}
diff --git a/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceServiceException.java b/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceServiceException.java
new file mode 100644
index 0000000..e618d46
--- /dev/null
+++ b/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceServiceException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.autoscaling.gce;
+
+
+/**
+ * Provides a specialized Exception type for the GCE module
+ */
+public class GceServiceException extends Exception
+{
+ public GceServiceException(String message)
+ {
+ super(message);
+ }
+}
diff --git a/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceUtils.java b/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceUtils.java
new file mode 100644
index 0000000..6e9109f
--- /dev/null
+++ b/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.autoscaling.gce;
+
+import org.apache.druid.java.util.common.StringUtils;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Simple collection of utilities extracted to ease testing and simplify the GceAutoScaler class
+ */
+public class GceUtils
+{
+
+ /**
+ * converts https://www.googleapis.com/compute/v1/projects/X/zones/Y/instances/name-of-the-thing
+ * into just `name-of-the-thing` as it is needed by the other pieces of the API
+ */
+ public static String extractNameFromInstance(String instance)
+ {
+ String name = instance;
+ if (instance != null && !instance.isEmpty()) {
+ int lastSlash = instance.lastIndexOf('/');
+ if (lastSlash > -1) {
+ name = instance.substring(lastSlash + 1);
+ } else {
+ name = instance; // let's assume not the URI like thing
+ }
+ }
+ return name;
+ }
+
+ /**
+ * Converts a list of terms to a 'OR' list of terms to look for a specific 'key'
+ */
+ public static String buildFilter(List<String> list, String key)
+ {
+ if (list == null || list.isEmpty() || key == null || key.isEmpty()) {
+ throw new IllegalArgumentException("Arguments cannot be empty of null");
+ }
+ Iterator<String> it = list.iterator();
+
+ StringBuilder sb = new StringBuilder();
+ sb.append(StringUtils.format("(%s = \"%s\")", key, it.next()));
+ while (it.hasNext()) {
+ sb.append(" OR ").append(StringUtils.format("(%s = \"%s\")", key, it.next()));
+ }
+ return sb.toString();
+ }
+
+ // cannot build it!
+ private GceUtils()
+ {
+ }
+}
diff --git a/extensions-contrib/gce-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-contrib/gce-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
new file mode 100644
index 0000000..c0fecfd
--- /dev/null
+++ b/extensions-contrib/gce-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.druid.indexing.overlord.autoscaling.gce.GceModule
diff --git a/extensions-contrib/gce-extensions/src/test/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceAutoScalerTest.java b/extensions-contrib/gce-extensions/src/test/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceAutoScalerTest.java
new file mode 100644
index 0000000..4c7e785
--- /dev/null
+++ b/extensions-contrib/gce-extensions/src/test/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceAutoScalerTest.java
@@ -0,0 +1,853 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.autoscaling.gce;
+
+import com.fasterxml.jackson.databind.BeanProperty;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.services.compute.Compute;
+import com.google.api.services.compute.model.Instance;
+import com.google.api.services.compute.model.InstanceGroupManagersDeleteInstancesRequest;
+import com.google.api.services.compute.model.InstanceGroupManagersListManagedInstancesResponse;
+import com.google.api.services.compute.model.InstanceList;
+import com.google.api.services.compute.model.ManagedInstance;
+import com.google.api.services.compute.model.NetworkInterface;
+import com.google.api.services.compute.model.Operation;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.indexing.overlord.autoscaling.AutoScaler;
+import org.apache.druid.indexing.overlord.autoscaling.AutoScalingData;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ */
+public class GceAutoScalerTest
+{
+ private Compute mockCompute = null;
+ // id -> ip & ip -> id
+ private Compute.Instances mockInstances = null;
+ private Compute.Instances.List mockIpToIdRequest = null;
+ private Compute.Instances.List mockIdToIpRequest = null;
+ // running instances
+ private Compute.InstanceGroupManagers mockInstanceGroupManagers = null;
+ private Compute.InstanceGroupManagers.ListManagedInstances mockInstancesRequest = null;
+ // terminate
+ private Compute.InstanceGroupManagers.DeleteInstances mockDeleteRequest = null;
+ //provision
+ private Compute.InstanceGroupManagers.Resize mockResizeRequest = null;
+
+ @Before
+ public void setUp()
+ {
+ // for every test let's create all (only a subset needed for each test tho)
+
+ mockCompute = EasyMock.createMock(Compute.class);
+
+ mockInstances = EasyMock.createMock(Compute.Instances.class);
+ mockIpToIdRequest = EasyMock.createMock(Compute.Instances.List.class);
+ mockIdToIpRequest = EasyMock.createMock(Compute.Instances.List.class);
+
+ mockInstanceGroupManagers = EasyMock.createMock(Compute.InstanceGroupManagers.class);
+ mockInstancesRequest = EasyMock.createMock(
+ Compute.InstanceGroupManagers.ListManagedInstances.class
+ );
+
+ mockDeleteRequest = EasyMock.createMock(Compute.InstanceGroupManagers.DeleteInstances.class);
+
+ mockResizeRequest = EasyMock.createMock(Compute.InstanceGroupManagers.Resize.class);
+ }
+
+ @After
+ public void tearDown()
+ {
+ // not calling verify here as we use different bits and pieces in each test
+ }
+
+ private static void verifyAutoScaler(final GceAutoScaler autoScaler)
+ {
+ Assert.assertEquals(1, autoScaler.getEnvConfig().getNumInstances());
+ Assert.assertEquals(4, autoScaler.getMaxNumWorkers());
+ Assert.assertEquals(2, autoScaler.getMinNumWorkers());
+ Assert.assertEquals("winkie-country", autoScaler.getEnvConfig().getZoneName());
+ Assert.assertEquals("super-project", autoScaler.getEnvConfig().getProjectId());
+ Assert.assertEquals("druid-mig", autoScaler.getEnvConfig().getManagedInstanceGroupName());
+ }
+
+ @Test
+ public void testConfig()
+ {
+ final String json = "{\n"
+ + " \"envConfig\" : {\n"
+ + " \"numInstances\" : 1,\n"
+ + " \"projectId\" : \"super-project\",\n"
+ + " \"zoneName\" : \"winkie-country\",\n"
+ + " \"managedInstanceGroupName\" : \"druid-mig\"\n"
+ + " },\n"
+ + " \"maxNumWorkers\" : 4,\n"
+ + " \"minNumWorkers\" : 2,\n"
+ + " \"type\" : \"gce\"\n"
+ + "}";
+
+ final ObjectMapper objectMapper = new DefaultObjectMapper()
+ .registerModules((Iterable<Module>) new GceModule().getJacksonModules());
+ objectMapper.setInjectableValues(
+ new InjectableValues()
+ {
+ @Override
+ public Object findInjectableValue(
+ Object o,
+ DeserializationContext deserializationContext,
+ BeanProperty beanProperty,
+ Object o1
+ )
+ {
+ return null;
+ }
+ }
+ );
+
+ try {
+ final GceAutoScaler autoScaler =
+ (GceAutoScaler) objectMapper.readValue(json, AutoScaler.class);
+ verifyAutoScaler(autoScaler);
+
+ final GceAutoScaler roundTripAutoScaler = (GceAutoScaler) objectMapper.readValue(
+ objectMapper.writeValueAsBytes(autoScaler),
+ AutoScaler.class
+ );
+ verifyAutoScaler(roundTripAutoScaler);
+
+ Assert.assertEquals("Round trip equals", autoScaler, roundTripAutoScaler);
+ }
+ catch (Exception e) {
+ Assert.fail(StringUtils.format("Got exception in test %s", e.getMessage()));
+ }
+ }
+
+ @Test
+ public void testConfigEquals()
+ {
+ EqualsVerifier.forClass(GceEnvironmentConfig.class).withNonnullFields(
+ "projectId", "zoneName", "managedInstanceGroupName", "numInstances"
+ ).usingGetClass().verify();
+ }
+
+ private Instance makeInstance(String name, String ip)
+ {
+ Instance instance = new Instance();
+ instance.setName(name);
+ NetworkInterface net = new NetworkInterface();
+ net.setNetworkIP(ip);
+ instance.setNetworkInterfaces(Collections.singletonList(net));
+ return instance;
+ }
+
+ @Test
+ public void testIpToId()
+ throws IOException, GeneralSecurityException, GceServiceException
+ {
+ GceAutoScaler autoScaler = EasyMock.createMockBuilder(GceAutoScaler.class).withConstructor(
+ int.class,
+ int.class,
+ GceEnvironmentConfig.class
+ ).withArgs(
+ 2,
+ 4,
+ new GceEnvironmentConfig(1, "proj-x", "us-central-1", "druid-mig")
+ ).addMockedMethod(
+ "createComputeServiceImpl"
+ ).createMock();
+
+ EasyMock.expect(autoScaler.createComputeServiceImpl()).andReturn(null);
+ EasyMock.expect(autoScaler.createComputeServiceImpl()).andReturn(mockCompute);
+ EasyMock.replay(autoScaler);
+
+ // empty IPs
+ List<String> ips1 = Collections.emptyList();
+ List<String> ids1 = autoScaler.ipToIdLookup(ips1);
+ Assert.assertEquals(0, ids1.size());
+
+ // actually not IPs
+ List<String> ips2 = Collections.singletonList("foo-bar-baz");
+ List<String> ids2 = autoScaler.ipToIdLookup(ips2);
+ Assert.assertEquals(ips2, ids2);
+
+ // actually IPs
+ Instance i1 = makeInstance("foo", "1.2.3.5"); // not the one we look for
+ Instance i2 = makeInstance("bar", "1.2.3.4"); // the one we do look for
+ InstanceList mockResponse = new InstanceList();
+ mockResponse.setNextPageToken(null);
+ mockResponse.setItems(Arrays.asList(i1, i2));
+
+ EasyMock.expect(mockIpToIdRequest.execute()).andReturn(mockResponse);
+ EasyMock.expect(mockIpToIdRequest.setPageToken(EasyMock.anyString())).andReturn(
+ mockIpToIdRequest // the method needs to return something, what is actually irrelevant here
+ );
+ EasyMock.replay(mockIpToIdRequest);
+
+ EasyMock.expect(mockInstances.list("proj-x", "us-central-1")).andReturn(mockIpToIdRequest);
+ EasyMock.replay(mockInstances);
+
+ EasyMock.expect(mockCompute.instances()).andReturn(mockInstances);
+ EasyMock.replay(mockCompute);
+
+ List<String> ips3 = Collections.singletonList("1.2.3.4");
+ List<String> ids3 = autoScaler.ipToIdLookup(ips3);
+ Assert.assertEquals(1, ids3.size());
+ Assert.assertEquals("bar", ids3.get(0));
+
+ EasyMock.verify(mockCompute);
+ EasyMock.verify(mockInstances);
+ EasyMock.verify(mockIpToIdRequest);
+ }
+
+ @Test
+ public void testIdToIp()
+ throws IOException, GeneralSecurityException, GceServiceException
+ {
+ GceAutoScaler autoScaler = EasyMock.createMockBuilder(GceAutoScaler.class).withConstructor(
+ int.class,
+ int.class,
+ GceEnvironmentConfig.class
+ ).withArgs(
+ 2,
+ 4,
+ new GceEnvironmentConfig(1, "proj-x", "us-central-1", "druid-mig")
+ ).addMockedMethod(
+ "createComputeServiceImpl"
+ ).createMock();
+
+ EasyMock.expect(autoScaler.createComputeServiceImpl()).andReturn(null);
+ EasyMock.expect(autoScaler.createComputeServiceImpl()).andReturn(mockCompute);
+ EasyMock.replay(autoScaler);
+
+ // empty IPs
+ List<String> ids1 = Collections.emptyList();
+ List<String> ips1 = autoScaler.idToIpLookup(ids1);
+ Assert.assertEquals(0, ips1.size());
+
+ // actually IDs
+ Instance i1 = makeInstance("foo", "null"); // invalid ip, not returned
+ Instance i2 = makeInstance("bar", "1.2.3.4"); // valid ip, returned
+ InstanceList mockResponse = new InstanceList();
+ mockResponse.setNextPageToken(null);
+ mockResponse.setItems(Arrays.asList(i1, i2));
+
+ EasyMock.expect(mockIdToIpRequest.setFilter("(name = \"foo\") OR (name = \"bar\")")).andReturn(
+ mockIdToIpRequest // the method needs to return something but it is actually irrelevant
+ );
+ EasyMock.expect(mockIdToIpRequest.execute()).andReturn(mockResponse);
+ EasyMock.expect(mockIdToIpRequest.setPageToken(EasyMock.anyString())).andReturn(
+ mockIdToIpRequest // the method needs to return something but it is actually irrelevant
+ );
+ EasyMock.replay(mockIdToIpRequest);
+
+ EasyMock.expect(mockInstances.list("proj-x", "us-central-1")).andReturn(mockIdToIpRequest);
+ EasyMock.replay(mockInstances);
+
+ EasyMock.expect(mockCompute.instances()).andReturn(mockInstances);
+ EasyMock.replay(mockCompute);
+
+ List<String> ids3 = Arrays.asList("foo", "bar");
+ List<String> ips3 = autoScaler.idToIpLookup(ids3);
+ Assert.assertEquals(1, ips3.size());
+ Assert.assertEquals("1.2.3.4", ips3.get(0));
+
+ EasyMock.verify(mockCompute);
+ EasyMock.verify(mockInstances);
+ EasyMock.verify(mockIdToIpRequest);
+ }
+
+ private InstanceGroupManagersListManagedInstancesResponse createRunningInstances(
+ List<String> instances
+ )
+ {
+ InstanceGroupManagersListManagedInstancesResponse mockResponse =
+ new InstanceGroupManagersListManagedInstancesResponse();
+ mockResponse.setManagedInstances(new ArrayList<>());
+ for (String x : instances) {
+ ManagedInstance mi = new ManagedInstance();
+ mi.setInstance(x);
+ mockResponse.getManagedInstances().add(mi);
+ }
+ return mockResponse;
+ }
+
+ @Test
+ public void testTerminateWithIds()
+ throws IOException, GeneralSecurityException, GceServiceException
+ {
+ GceAutoScaler autoScaler = EasyMock.createMockBuilder(GceAutoScaler.class).withConstructor(
+ int.class,
+ int.class,
+ GceEnvironmentConfig.class
+ ).withArgs(
+ 2,
+ 4,
+ new GceEnvironmentConfig(1, "proj-x", "us-central-1", "druid-mig")
+ ).addMockedMethod(
+ "createComputeServiceImpl"
+ ).createMock();
+
+ EasyMock.expect(autoScaler.createComputeServiceImpl()).andReturn(null);
+ EasyMock.expect(autoScaler.createComputeServiceImpl()).andReturn(mockCompute);
+ EasyMock.replay(autoScaler);
+
+ // set up getRunningInstances results
+ InstanceGroupManagersListManagedInstancesResponse beforeRunningInstance =
+ createRunningInstances(Arrays.asList(
+ "http://xyz/foo",
+ "http://xyz/bar",
+ "http://xyz/baz"
+ ));
+ InstanceGroupManagersListManagedInstancesResponse afterRunningInstance =
+ createRunningInstances(Arrays.asList(
+ "http://xyz/foo",
+ "http://xyz/bar"
+ ));
+
+ EasyMock.expect(mockInstancesRequest.execute()).andReturn(beforeRunningInstance); // 1st call
+ EasyMock.expect(mockInstancesRequest.setMaxResults(500L)).andReturn(mockInstancesRequest);
+ EasyMock.expect(mockInstancesRequest.execute()).andReturn(afterRunningInstance); // 2nd call
+ EasyMock.expect(mockInstancesRequest.setMaxResults(500L)).andReturn(mockInstancesRequest);
+ EasyMock.replay(mockInstancesRequest);
+
+
+ EasyMock.expect(mockInstanceGroupManagers.listManagedInstances(
+ "proj-x",
+ "us-central-1",
+ "druid-mig"
+ )).andReturn(mockInstancesRequest).times(2);
+
+ // set up the delete operation
+ Operation mockResponse = new Operation();
+ mockResponse.setStatus("DONE");
+ mockResponse.setError(new Operation.Error());
+
+ EasyMock.expect(mockDeleteRequest.execute()).andReturn(mockResponse);
+ EasyMock.replay(mockDeleteRequest);
+
+ InstanceGroupManagersDeleteInstancesRequest requestBody =
+ new InstanceGroupManagersDeleteInstancesRequest();
+ requestBody.setInstances(Collections.singletonList("zones/us-central-1/instances/baz"));
+
+ EasyMock.expect(mockInstanceGroupManagers.deleteInstances(
+ "proj-x",
+ "us-central-1",
+ "druid-mig",
+ requestBody
+ )).andReturn(mockDeleteRequest);
+
+ EasyMock.replay(mockInstanceGroupManagers);
+
+ // called twice in getRunningInstances...
+ EasyMock.expect(mockCompute.instanceGroupManagers()).andReturn(mockInstanceGroupManagers);
+ EasyMock.expect(mockCompute.instanceGroupManagers()).andReturn(mockInstanceGroupManagers);
+ // ...and once in terminateWithIds
+ EasyMock.expect(mockCompute.instanceGroupManagers()).andReturn(mockInstanceGroupManagers);
+
+ // and that's all folks!
+ EasyMock.replay(mockCompute);
+
+ AutoScalingData autoScalingData =
+ autoScaler.terminateWithIds(Collections.singletonList("baz"));
+ Assert.assertEquals(1, autoScalingData.getNodeIds().size());
+ Assert.assertEquals("baz", autoScalingData.getNodeIds().get(0));
+
+ EasyMock.verify(mockCompute);
+ EasyMock.verify(mockInstanceGroupManagers);
+ EasyMock.verify(mockDeleteRequest);
+ EasyMock.verify(mockInstancesRequest);
+ }
+
+ @Test
+ public void testTerminate()
+ throws IOException, GeneralSecurityException, GceServiceException
+ {
+ GceAutoScaler autoScaler = EasyMock.createMockBuilder(GceAutoScaler.class).withConstructor(
+ int.class,
+ int.class,
+ GceEnvironmentConfig.class
+ ).withArgs(
+ 2,
+ 4,
+ new GceEnvironmentConfig(1, "proj-x", "us-central-1", "druid-mig")
+ ).addMockedMethod(
+ "createComputeServiceImpl"
+ ).createMock();
+
+ EasyMock.expect(autoScaler.createComputeServiceImpl()).andReturn(null);
+ EasyMock.expect(autoScaler.createComputeServiceImpl()).andReturn(mockCompute);
+ EasyMock.replay(autoScaler);
+
+ // testing the ip --> id part
+ Instance i0 = makeInstance("baz", "1.2.3.6");
+ InstanceList mockInstanceListResponse = new InstanceList();
+ mockInstanceListResponse.setNextPageToken(null);
+ mockInstanceListResponse.setItems(Collections.singletonList(i0));
+
+ EasyMock.expect(mockIpToIdRequest.execute()).andReturn(mockInstanceListResponse);
+ EasyMock.expect(mockIpToIdRequest.setPageToken(EasyMock.anyString())).andReturn(
+ mockIpToIdRequest // the method needs to return something, what is actually irrelevant here
+ );
+ EasyMock.replay(mockIpToIdRequest);
+
+ EasyMock.expect(mockInstances.list("proj-x", "us-central-1")).andReturn(mockIpToIdRequest);
+
+ EasyMock.expect(mockCompute.instances()).andReturn(mockInstances);
+ EasyMock.replay(mockInstances);
+
+ // testing the delete part
+ InstanceGroupManagersListManagedInstancesResponse beforeRunningInstance =
+ createRunningInstances(Arrays.asList(
+ "http://xyz/foo",
+ "http://xyz/bar",
+ "http://xyz/baz"
+ ));
+ InstanceGroupManagersListManagedInstancesResponse afterRunningInstance =
+ createRunningInstances(Arrays.asList(
+ "http://xyz/foo",
+ "http://xyz/bar"
+ ));
+
+ EasyMock.expect(mockInstancesRequest.execute()).andReturn(beforeRunningInstance); // 1st call
+ EasyMock.expect(mockInstancesRequest.setMaxResults(500L)).andReturn(mockInstancesRequest);
+ EasyMock.expect(mockInstancesRequest.execute()).andReturn(afterRunningInstance); // 2nd call
+ EasyMock.expect(mockInstancesRequest.setMaxResults(500L)).andReturn(mockInstancesRequest);
+ EasyMock.replay(mockInstancesRequest);
+
+ EasyMock.expect(mockInstanceGroupManagers.listManagedInstances(
+ "proj-x",
+ "us-central-1",
+ "druid-mig"
+ )).andReturn(mockInstancesRequest).times(2);
+
+ // set up the delete operation
+ Operation mockResponse = new Operation();
+ mockResponse.setStatus("DONE");
+ mockResponse.setError(new Operation.Error());
+
+ EasyMock.expect(mockDeleteRequest.execute()).andReturn(mockResponse);
+ EasyMock.replay(mockDeleteRequest);
+
+ InstanceGroupManagersDeleteInstancesRequest requestBody =
+ new InstanceGroupManagersDeleteInstancesRequest();
+ requestBody.setInstances(Collections.singletonList("zones/us-central-1/instances/baz"));
+
+ EasyMock.expect(mockInstanceGroupManagers.deleteInstances(
+ "proj-x",
+ "us-central-1",
+ "druid-mig",
+ requestBody
+ )).andReturn(mockDeleteRequest);
+
+ EasyMock.replay(mockInstanceGroupManagers);
+
+ // called twice in getRunningInstances...
+ EasyMock.expect(mockCompute.instanceGroupManagers()).andReturn(mockInstanceGroupManagers);
+ EasyMock.expect(mockCompute.instanceGroupManagers()).andReturn(mockInstanceGroupManagers);
+ // ...and once in terminateWithIds
+ EasyMock.expect(mockCompute.instanceGroupManagers()).andReturn(mockInstanceGroupManagers);
+
+ // and that's all folks!
+ EasyMock.replay(mockCompute);
+
+ AutoScalingData autoScalingData =
+ autoScaler.terminate(Collections.singletonList("1.2.3.6"));
+ Assert.assertEquals(1, autoScalingData.getNodeIds().size());
+ Assert.assertEquals("baz", autoScalingData.getNodeIds().get(0));
+
+ EasyMock.verify(mockCompute);
+ EasyMock.verify(mockIpToIdRequest);
+ EasyMock.verify(mockInstanceGroupManagers);
+ EasyMock.verify(mockDeleteRequest);
+ EasyMock.verify(mockInstancesRequest);
+ }
+
+ @Test
+ public void testTerminateWithIdsWithMissingRemoval()
+ throws IOException, GeneralSecurityException, GceServiceException
+ {
+ GceAutoScaler autoScaler = EasyMock.createMockBuilder(GceAutoScaler.class).withConstructor(
+ int.class,
+ int.class,
+ GceEnvironmentConfig.class
+ ).withArgs(
+ 2,
+ 4,
+ new GceEnvironmentConfig(1, "proj-x", "us-central-1", "druid-mig")
+ ).addMockedMethod(
+ "createComputeServiceImpl"
+ ).createMock();
+
+ EasyMock.expect(autoScaler.createComputeServiceImpl()).andReturn(null);
+ EasyMock.expect(autoScaler.createComputeServiceImpl()).andReturn(mockCompute);
+ EasyMock.replay(autoScaler);
+
+ // set up getRunningInstances results
+ InstanceGroupManagersListManagedInstancesResponse beforeRunningInstance =
+ createRunningInstances(Arrays.asList(
+ "http://xyz/foo",
+ "http://xyz/bar",
+ "http://xyz/baz"
+ ));
+ InstanceGroupManagersListManagedInstancesResponse after1RunningInstance =
+ createRunningInstances(Arrays.asList(
+ "http://xyz/foo",
+ "http://xyz/bar",
+ "http://xyz/baz"
+ )); // not changing anything, will trigger the loop around getRunningInstances
+ InstanceGroupManagersListManagedInstancesResponse after2RunningInstance =
+ createRunningInstances(Arrays.asList(
+ "http://xyz/foo",
+ "http://xyz/bar"
+ )); // now the machine got dropped!
+
+ EasyMock.expect(mockInstancesRequest.execute()).andReturn(beforeRunningInstance); // 1st call
+ EasyMock.expect(mockInstancesRequest.setMaxResults(500L)).andReturn(mockInstancesRequest);
+ EasyMock.expect(mockInstancesRequest.execute()).andReturn(after1RunningInstance); // 2nd call, the next is needed
+ EasyMock.expect(mockInstancesRequest.setMaxResults(500L)).andReturn(mockInstancesRequest);
+ EasyMock.expect(mockInstancesRequest.execute()).andReturn(after2RunningInstance); // 3rd call, this unblocks
+ EasyMock.expect(mockInstancesRequest.setMaxResults(500L)).andReturn(mockInstancesRequest);
+ EasyMock.replay(mockInstancesRequest);
+
+
+ EasyMock.expect(mockInstanceGroupManagers.listManagedInstances(
+ "proj-x",
+ "us-central-1",
+ "druid-mig"
+ )).andReturn(mockInstancesRequest).times(3);
+
+ // set up the delete operation
+ Operation mockResponse = new Operation();
+ mockResponse.setStatus("DONE");
+ mockResponse.setError(new Operation.Error());
+
+ EasyMock.expect(mockDeleteRequest.execute()).andReturn(mockResponse);
+ EasyMock.replay(mockDeleteRequest);
+
+ InstanceGroupManagersDeleteInstancesRequest requestBody =
+ new InstanceGroupManagersDeleteInstancesRequest();
+ requestBody.setInstances(Collections.singletonList("zones/us-central-1/instances/baz"));
+
+ EasyMock.expect(mockInstanceGroupManagers.deleteInstances(
+ "proj-x",
+ "us-central-1",
+ "druid-mig",
+ requestBody
+ )).andReturn(mockDeleteRequest);
+
+ EasyMock.replay(mockInstanceGroupManagers);
+
+ // called three times in getRunningInstances...
+ EasyMock.expect(mockCompute.instanceGroupManagers()).andReturn(mockInstanceGroupManagers);
+ EasyMock.expect(mockCompute.instanceGroupManagers()).andReturn(mockInstanceGroupManagers);
+ EasyMock.expect(mockCompute.instanceGroupManagers()).andReturn(mockInstanceGroupManagers);
+ // ...and once in terminateWithIds
+ EasyMock.expect(mockCompute.instanceGroupManagers()).andReturn(mockInstanceGroupManagers);
+
+ // and that's all folks!
+ EasyMock.replay(mockCompute);
+
+ AutoScalingData autoScalingData =
+ autoScaler.terminateWithIds(Collections.singletonList("baz"));
+ Assert.assertEquals(1, autoScalingData.getNodeIds().size());
+ Assert.assertEquals("baz", autoScalingData.getNodeIds().get(0));
+
+ EasyMock.verify(mockCompute);
+ EasyMock.verify(mockInstanceGroupManagers);
+ EasyMock.verify(mockDeleteRequest);
+ EasyMock.verify(mockInstancesRequest);
+ }
+
+ @Test
+ public void testProvision()
+ throws IOException, GeneralSecurityException, GceServiceException
+ {
+ GceAutoScaler autoScaler = EasyMock.createMockBuilder(GceAutoScaler.class).withConstructor(
+ int.class,
+ int.class,
+ GceEnvironmentConfig.class
+ ).withArgs(
+ 2,
+ 4,
+ new GceEnvironmentConfig(1, "proj-x", "us-central-1", "druid-mig")
+ ).addMockedMethod(
+ "createComputeServiceImpl"
+ ).createMock();
+
+ EasyMock.expect(autoScaler.createComputeServiceImpl()).andReturn(null);
+ EasyMock.expect(autoScaler.createComputeServiceImpl()).andReturn(mockCompute);
+ EasyMock.replay(autoScaler);
+
+ // set up getRunningInstances results
+ InstanceGroupManagersListManagedInstancesResponse beforeRunningInstance =
+ createRunningInstances(Arrays.asList(
+ "http://xyz/foo",
+ "http://xyz/bar"
+ ));
+ InstanceGroupManagersListManagedInstancesResponse afterRunningInstance =
+ createRunningInstances(Arrays.asList(
+ "http://xyz/foo",
+ "http://xyz/bar",
+ "http://xyz/baz"
+ ));
+
+ EasyMock.expect(mockInstancesRequest.execute()).andReturn(beforeRunningInstance); // 1st call
+ EasyMock.expect(mockInstancesRequest.setMaxResults(500L)).andReturn(mockInstancesRequest);
+ EasyMock.expect(mockInstancesRequest.execute()).andReturn(afterRunningInstance); // 2nd call
+ EasyMock.expect(mockInstancesRequest.setMaxResults(500L)).andReturn(mockInstancesRequest);
+ EasyMock.replay(mockInstancesRequest);
+
+ EasyMock.expect(mockInstanceGroupManagers.listManagedInstances(
+ "proj-x",
+ "us-central-1",
+ "druid-mig"
+ )).andReturn(mockInstancesRequest).times(2);
+
+ // set up the resize operation
+ Operation mockResponse = new Operation();
+ mockResponse.setStatus("DONE");
+ mockResponse.setError(new Operation.Error());
+
+ EasyMock.expect(mockResizeRequest.execute()).andReturn(mockResponse);
+ EasyMock.replay(mockResizeRequest);
+
+ EasyMock.expect(mockInstanceGroupManagers.resize(
+ "proj-x",
+ "us-central-1",
+ "druid-mig",
+ 3
+ )).andReturn(mockResizeRequest);
+
+ EasyMock.replay(mockInstanceGroupManagers);
+
+ // called twice in getRunningInstances...
+ EasyMock.expect(mockCompute.instanceGroupManagers()).andReturn(mockInstanceGroupManagers);
+ EasyMock.expect(mockCompute.instanceGroupManagers()).andReturn(mockInstanceGroupManagers);
+ // ...and once in provision
+ EasyMock.expect(mockCompute.instanceGroupManagers()).andReturn(mockInstanceGroupManagers);
+
+ // and that's all folks!
+ EasyMock.replay(mockCompute);
+
+ AutoScalingData autoScalingData = autoScaler.provision();
+ Assert.assertEquals(1, autoScalingData.getNodeIds().size());
+ Assert.assertEquals("baz", autoScalingData.getNodeIds().get(0));
+
+ EasyMock.verify(mockCompute);
+ EasyMock.verify(mockInstanceGroupManagers);
+ EasyMock.verify(mockResizeRequest);
+ EasyMock.verify(mockInstancesRequest);
+ }
+
+ @Test
+ public void testProvisionSkipped()
+ throws IOException, GeneralSecurityException, GceServiceException
+ {
+ GceAutoScaler autoScaler = EasyMock.createMockBuilder(GceAutoScaler.class).withConstructor(
+ int.class,
+ int.class,
+ GceEnvironmentConfig.class
+ ).withArgs(
+ 2,
+ 4,
+ new GceEnvironmentConfig(1, "proj-x", "us-central-1", "druid-mig")
+ ).addMockedMethod(
+ "createComputeServiceImpl"
+ ).createMock();
+
+ EasyMock.expect(autoScaler.createComputeServiceImpl()).andReturn(null);
+ EasyMock.expect(autoScaler.createComputeServiceImpl()).andReturn(mockCompute);
+ EasyMock.replay(autoScaler);
+
+ // set up getRunningInstances results
+ InstanceGroupManagersListManagedInstancesResponse beforeRunningInstance =
+ createRunningInstances(Arrays.asList(
+ "http://xyz/foo",
+ "http://xyz/bar",
+ "http://xyz/baz",
+ "http://xyz/zab" // already max instances, will not scale
+ ));
+
+ EasyMock.expect(mockInstancesRequest.execute()).andReturn(beforeRunningInstance);
+ EasyMock.expect(mockInstancesRequest.setMaxResults(500L)).andReturn(mockInstancesRequest);
+ EasyMock.replay(mockInstancesRequest);
+
+ EasyMock.expect(mockInstanceGroupManagers.listManagedInstances(
+ "proj-x",
+ "us-central-1",
+ "druid-mig"
+ )).andReturn(mockInstancesRequest);
+
+ EasyMock.expect(mockCompute.instanceGroupManagers()).andReturn(mockInstanceGroupManagers);
+ EasyMock.replay(mockInstanceGroupManagers);
+
+ // and that's all folks!
+ EasyMock.replay(mockCompute);
+
+ AutoScalingData autoScalingData = autoScaler.provision();
+ Assert.assertEquals(0, autoScalingData.getNodeIds().size());
+
+ EasyMock.verify(mockCompute);
+ EasyMock.verify(mockInstancesRequest);
+ EasyMock.verify(mockInstanceGroupManagers);
+ }
+
+ @Test
+ public void testProvisionWithMissingNewInstances()
+ throws IOException, GeneralSecurityException, GceServiceException
+ {
+ GceAutoScaler autoScaler = EasyMock.createMockBuilder(GceAutoScaler.class).withConstructor(
+ int.class,
+ int.class,
+ GceEnvironmentConfig.class
+ ).withArgs(
+ 2,
+ 4,
+ new GceEnvironmentConfig(1, "proj-x", "us-central-1", "druid-mig")
+ ).addMockedMethod(
+ "createComputeServiceImpl"
+ ).createMock();
+
+ EasyMock.expect(autoScaler.createComputeServiceImpl()).andReturn(null);
+ EasyMock.expect(autoScaler.createComputeServiceImpl()).andReturn(mockCompute);
+ EasyMock.replay(autoScaler);
+
+ // set up getRunningInstances results
+ InstanceGroupManagersListManagedInstancesResponse beforeRunningInstance =
+ createRunningInstances(Arrays.asList(
+ "http://xyz/foo",
+ "http://xyz/bar"
+ ));
+ InstanceGroupManagersListManagedInstancesResponse after1RunningInstance =
+ createRunningInstances(Arrays.asList(
+ "http://xyz/foo",
+ "http://xyz/bar"
+ )); // not changing anything, will trigger the loop around getRunningInstances
+ InstanceGroupManagersListManagedInstancesResponse after2RunningInstance =
+ createRunningInstances(Arrays.asList(
+ "http://xyz/foo",
+ "http://xyz/bar",
+ "http://xyz/baz"
+ )); // now the new machine is here!
+
+ EasyMock.expect(mockInstancesRequest.execute()).andReturn(beforeRunningInstance); // 1st call
+ EasyMock.expect(mockInstancesRequest.setMaxResults(500L)).andReturn(mockInstancesRequest);
+ EasyMock.expect(mockInstancesRequest.execute()).andReturn(after1RunningInstance); // 2nd call, the next is needed
+ EasyMock.expect(mockInstancesRequest.setMaxResults(500L)).andReturn(mockInstancesRequest);
+ EasyMock.expect(mockInstancesRequest.execute()).andReturn(after2RunningInstance); // 3rd call, this unblocks
+ EasyMock.expect(mockInstancesRequest.setMaxResults(500L)).andReturn(mockInstancesRequest);
+ EasyMock.replay(mockInstancesRequest);
+
+ EasyMock.expect(mockInstanceGroupManagers.listManagedInstances(
+ "proj-x",
+ "us-central-1",
+ "druid-mig"
+ )).andReturn(mockInstancesRequest).times(3);
+
+ // set up the resize operation
+ Operation mockResponse = new Operation();
+ mockResponse.setStatus("DONE");
+ mockResponse.setError(new Operation.Error());
+
+ EasyMock.expect(mockResizeRequest.execute()).andReturn(mockResponse);
+ EasyMock.replay(mockResizeRequest);
+
+ EasyMock.expect(mockInstanceGroupManagers.resize(
+ "proj-x",
+ "us-central-1",
+ "druid-mig",
+ 3
+ )).andReturn(mockResizeRequest);
+
+ EasyMock.replay(mockInstanceGroupManagers);
+
+ // called three times in getRunningInstances...
+ EasyMock.expect(mockCompute.instanceGroupManagers()).andReturn(mockInstanceGroupManagers);
+ EasyMock.expect(mockCompute.instanceGroupManagers()).andReturn(mockInstanceGroupManagers);
+ EasyMock.expect(mockCompute.instanceGroupManagers()).andReturn(mockInstanceGroupManagers);
+ // ...and once in provision
+ EasyMock.expect(mockCompute.instanceGroupManagers()).andReturn(mockInstanceGroupManagers);
+
+ // and that's all folks!
+ EasyMock.replay(mockCompute);
+
+ AutoScalingData autoScalingData = autoScaler.provision();
+ Assert.assertEquals(1, autoScalingData.getNodeIds().size());
+ Assert.assertEquals("baz", autoScalingData.getNodeIds().get(0));
+
+ EasyMock.verify(mockCompute);
+ EasyMock.verify(mockInstanceGroupManagers);
+ EasyMock.verify(mockResizeRequest);
+ EasyMock.verify(mockInstancesRequest);
+ }
+
+ @Test
+ public void testEquals()
+ {
+ EqualsVerifier.forClass(GceAutoScaler.class).withNonnullFields(
+ "envConfig", "maxNumWorkers", "minNumWorkers"
+ ).withIgnoredFields("cachedComputeService").usingGetClass().verify();
+ }
+
+ @Test
+ public void testFailedComputeCreation()
+ throws IOException, GeneralSecurityException, GceServiceException
+ {
+ GceAutoScaler autoScaler = EasyMock.createMockBuilder(GceAutoScaler.class).withConstructor(
+ int.class,
+ int.class,
+ GceEnvironmentConfig.class
+ ).withArgs(
+ 2,
+ 4,
+ new GceEnvironmentConfig(1, "proj-x", "us-central-1", "druid-mig")
+ ).addMockedMethod(
+ "createComputeServiceImpl"
+ ).createMock();
+
+ EasyMock.expect(autoScaler.createComputeServiceImpl()).andReturn(null);
+ EasyMock.expect(autoScaler.createComputeServiceImpl()).andReturn(null);
+ EasyMock.expect(autoScaler.createComputeServiceImpl()).andReturn(null);
+ EasyMock.expect(autoScaler.createComputeServiceImpl()).andReturn(null);
+ EasyMock.expect(autoScaler.createComputeServiceImpl()).andReturn(null);
+ EasyMock.replay(autoScaler);
+
+ List<String> ips = Collections.singletonList("1.2.3.4");
+ List<String> ids = autoScaler.ipToIdLookup(ips);
+ Assert.assertEquals(0, ids.size()); // Exception caught in execution results in empty result
+ }
+
+}
diff --git a/extensions-contrib/gce-extensions/src/test/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceUtilsTest.java b/extensions-contrib/gce-extensions/src/test/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceUtilsTest.java
new file mode 100644
index 0000000..7c88355
--- /dev/null
+++ b/extensions-contrib/gce-extensions/src/test/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceUtilsTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.autoscaling.gce;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ */
+public class GceUtilsTest
+{
+ @Test
+ public void testExtractNameFromInstance()
+ {
+ String instance0 =
+ "https://www.googleapis.com/compute/v1/projects/X/zones/Y/instances/name-of-the-thing";
+ Assert.assertEquals("name-of-the-thing", GceUtils.extractNameFromInstance(instance0));
+
+ String instance1 = "https://www.googleapis.com/compute/v1/projects/X/zones/Y/instances/";
+ Assert.assertEquals("", GceUtils.extractNameFromInstance(instance1));
+
+ String instance2 = "name-of-the-thing";
+ Assert.assertEquals("name-of-the-thing", GceUtils.extractNameFromInstance(instance2));
+
+ String instance3 = null;
+ Assert.assertEquals(null, GceUtils.extractNameFromInstance(instance3));
+
+ String instance4 = "";
+ Assert.assertEquals("", GceUtils.extractNameFromInstance(instance4));
+ }
+
+ @Test
+ public void testBuildFilter()
+ {
+ List<String> list0 = null;
+ try {
+ String x = GceUtils.buildFilter(list0, "name");
+ Assert.fail("Exception should have been thrown!");
+ }
+ catch (IllegalArgumentException e) {
+ // ok to be here!
+ }
+
+ List<String> list1 = new ArrayList<>();
+ try {
+ String x = GceUtils.buildFilter(list1, "name");
+ Assert.fail("Exception should have been thrown!");
+ }
+ catch (IllegalArgumentException e) {
+ // ok to be here!
+ }
+
+ List<String> list2 = new ArrayList<>();
+ list2.add("foo");
+ try {
+ String x = GceUtils.buildFilter(list2, null);
+ Assert.fail("Exception should have been thrown!");
+ }
+ catch (IllegalArgumentException e) {
+ // ok to be here!
+ }
+
+ List<String> list3 = new ArrayList<>();
+ list3.add("foo");
+ Assert.assertEquals("(name = \"foo\")", GceUtils.buildFilter(list3, "name"));
+
+ List<String> list4 = new ArrayList<>();
+ list4.add("foo");
+ list4.add("bar");
+ Assert.assertEquals(
+ "(name = \"foo\") OR (name = \"bar\")",
+ GceUtils.buildFilter(list4, "name")
+ );
+ }
+
+}
diff --git a/extensions-core/google-extensions/pom.xml b/extensions-core/google-extensions/pom.xml
index e17da6d..c820aa9 100644
--- a/extensions-core/google-extensions/pom.xml
+++ b/extensions-core/google-extensions/pom.xml
@@ -34,7 +34,7 @@
</parent>
<properties>
- <com.google.apis.storage.version>v1-rev79-${com.google.apis.client.version}</com.google.apis.storage.version>
+ <com.google.apis.storage.version>v1-rev158-${com.google.apis.client.version}</com.google.apis.storage.version>
</properties>
<dependencies>
diff --git a/licenses.yaml b/licenses.yaml
index 97b2272..0b7e270 100644
--- a/licenses.yaml
+++ b/licenses.yaml
@@ -4086,12 +4086,22 @@ name: Google Cloud Storage JSON API
license_category: binary
module: extensions/druid-google-extensions
license_name: Apache License version 2.0
-version: v1-rev79-1.22.0
+version: v1-rev158-1.25.0
libraries:
- com.google.apis: google-api-services-storage
---
+name: Google Compute Engine API
+license_category: binary
+module: extensions/gce-extensions
+license_name: Apache License version 2.0
+version: v1-rev214-1.25.0
+libraries:
+ - com.google.apis: google-api-services-compute
+
+---
+
name: "Jackson Module: Guice"
license_category: binary
module: java-core
@@ -4106,7 +4116,7 @@ name: Google APIs Client Library For Java
license_category: binary
module: java-core
license_name: Apache License version 2.0
-version: 1.22.0
+version: 1.25.0
libraries:
- com.google.api-client: google-api-client
@@ -4116,7 +4126,7 @@ name: Google HTTP Client Library For Java
license_category: binary
module: java-core
license_name: Apache License version 2.0
-version: 1.22.0
+version: 1.25.0
libraries:
- com.google.http-client: google-http-client
- com.google.http-client: google-http-client-jackson2
diff --git a/pom.xml b/pom.xml
index 24c24b7..fdf344b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,8 +114,8 @@
<!-- When upgrading ZK, edit docs and integration tests as well (integration-tests/docker-base/setup.sh) -->
<zookeeper.version>3.4.14</zookeeper.version>
<checkerframework.version>2.5.7</checkerframework.version>
- <com.google.apis.client.version>1.22.0</com.google.apis.client.version>
-
+ <com.google.apis.client.version>1.25.0</com.google.apis.client.version>
+ <com.google.apis.compute.version>v1-rev214-1.25.0</com.google.apis.compute.version>
<repoOrgId>apache.snapshots</repoOrgId>
<repoOrgName>Apache Snapshot Repository</repoOrgName>
<repoOrgUrl>https://repository.apache.org/snapshots</repoOrgUrl>
@@ -188,6 +188,7 @@
<module>extensions-contrib/moving-average-query</module>
<module>extensions-contrib/tdigestsketch</module>
<module>extensions-contrib/influxdb-emitter</module>
+ <module>extensions-contrib/gce-extensions</module>
<!-- distribution packaging -->
<module>distribution</module>
</modules>
@@ -1193,6 +1194,13 @@
<artifactId>resilience4j-bulkhead</artifactId>
<version>${resilience4j.version}</version>
</dependency>
+ <!-- GCE -->
+ <dependency>
+ <groupId>com.google.apis</groupId>
+ <artifactId>google-api-services-compute</artifactId>
+ <version>${com.google.apis.compute.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.testng</groupId>
diff --git a/server/src/test/java/org/apache/druid/server/initialization/JettyTest.java b/server/src/test/java/org/apache/druid/server/initialization/JettyTest.java
index 206c318..2ab9f7c 100644
--- a/server/src/test/java/org/apache/druid/server/initialization/JettyTest.java
+++ b/server/src/test/java/org/apache/druid/server/initialization/JettyTest.java
@@ -511,7 +511,7 @@ public class JettyTest extends BaseJettyTest
{
// it can take a bit to close the connection, so maybe sleep for a while and hope it closes
final int sleepTimeMills = 10;
- final int totalSleeps = 5_000 / sleepTimeMills;
+ final int totalSleeps = 10_000 / sleepTimeMills;
int count = 0;
while (jsm.getActiveConnections() > 0 && count++ < totalSleeps) {
Thread.sleep(sleepTimeMills);
diff --git a/website/.spelling b/website/.spelling
index 531d083..dd783f4 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1548,6 +1548,7 @@ EventReceiverFirehose
File.getFreeSpace
File.getTotalSpace
ForkJoinPool
+GCE
HadoopIndexTasks
HttpEmitter
HttpPostEmitter
@@ -1556,6 +1557,7 @@ JRE8u60
KeyManager
L1
L2
+ListManagedInstances
LoadSpec
LoggingEmitter
Los_Angeles
@@ -1597,6 +1599,8 @@ affinityConfig
allowAll
ANDed
array_mod
+autoscale
+autoscalers
batch_index_task
cgroup
classloader
@@ -1634,6 +1638,8 @@ floatMax
floatMin
floatSum
freeSpacePercent
+gce
+gce-extensions
getCanonicalHostName
groupBy
hdfs
diff --git a/website/i18n/en.json b/website/i18n/en.json
index c769b68..f4e86f6 100644
--- a/website/i18n/en.json
+++ b/website/i18n/en.json
@@ -101,6 +101,9 @@
"development/extensions-contrib/distinctcount": {
"title": "DistinctCount Aggregator"
},
+ "development/extensions-contrib/gce-extensions": {
+ "title": "GCE Extensions"
+ },
"development/extensions-contrib/graphite": {
"title": "Graphite Emitter"
},
diff --git a/website/sidebars.json b/website/sidebars.json
index 004619a..da8a734 100644
--- a/website/sidebars.json
+++ b/website/sidebars.json
@@ -214,6 +214,7 @@
"development/extensions-contrib/tdigestsketch-quantiles",
"development/extensions-contrib/thrift",
"development/extensions-contrib/time-min-max",
+ "development/extensions-contrib/gce-extensions",
"ingestion/standalone-realtime"
]
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org