You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2020/07/08 13:18:18 UTC
[brooklyn-server] 08/20: Change lebels
This is an automated email from the ASF dual-hosted git repository.
heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 44816f179bed918de552ce2681cd9a80b638bd0b
Author: Duncan Grant <du...@cloudsoft.io>
AuthorDate: Tue Jul 7 12:06:20 2020 +0100
Change lebels
---
.../brooklyn/container/entity/helm/HelmEntity.java | 6 +-
.../container/entity/helm/HelmEntityImpl.java | 114 ++++++++++++++-------
.../container/entity/helm/HelmEntityLiveTest.java | 27 ++---
3 files changed, 96 insertions(+), 51 deletions(-)
diff --git a/locations/container/src/main/java/org/apache/brooklyn/container/entity/helm/HelmEntity.java b/locations/container/src/main/java/org/apache/brooklyn/container/entity/helm/HelmEntity.java
index 7262f11..4dba320 100644
--- a/locations/container/src/main/java/org/apache/brooklyn/container/entity/helm/HelmEntity.java
+++ b/locations/container/src/main/java/org/apache/brooklyn/container/entity/helm/HelmEntity.java
@@ -32,7 +32,7 @@ import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.entity.software.base.SoftwareProcess;
@ImplementedBy(HelmEntityImpl.class)
-public interface HelmEntity extends Entity, Resizable, Startable {
+public interface HelmEntity extends Entity, Startable {
public static final ConfigKey<String> REPO_NAME = ConfigKeys.newStringConfigKey(
"repo.name",
@@ -66,4 +66,8 @@ public interface HelmEntity extends Entity, Resizable, Startable {
AttributeSensor<Integer> REPLICAS = Sensors.newIntegerSensor("kube.replicas",
"The number of replicas");
+
+ @Effector(description="")
+ Integer resize(@EffectorParam(name="deplymentName") String name, @EffectorParam(name="desiredSize") Integer desiredSize);
+
}
diff --git a/locations/container/src/main/java/org/apache/brooklyn/container/entity/helm/HelmEntityImpl.java b/locations/container/src/main/java/org/apache/brooklyn/container/entity/helm/HelmEntityImpl.java
index 393298d..b3693bb 100644
--- a/locations/container/src/main/java/org/apache/brooklyn/container/entity/helm/HelmEntityImpl.java
+++ b/locations/container/src/main/java/org/apache/brooklyn/container/entity/helm/HelmEntityImpl.java
@@ -22,20 +22,25 @@ import com.google.common.base.Functions;
import com.google.common.collect.ImmutableList;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentList;
+import io.fabric8.kubernetes.api.model.apps.DoneableDeployment;
import io.fabric8.kubernetes.client.*;
-import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
+import io.fabric8.kubernetes.client.dsl.*;
import org.apache.brooklyn.api.location.Location;
import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.container.location.kubernetes.KubernetesLocation;
import org.apache.brooklyn.core.entity.AbstractEntity;
import org.apache.brooklyn.core.entity.Attributes;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.feed.function.FunctionFeed;
import org.apache.brooklyn.feed.function.FunctionPollConfig;
import org.apache.brooklyn.util.core.internal.ssh.process.ProcessTool;
import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -46,11 +51,13 @@ import java.nio.file.Paths;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
-import java.util.function.Function;
-import java.util.stream.Collectors;
public class HelmEntityImpl extends AbstractEntity implements HelmEntity {
+ private static final Logger LOG = LoggerFactory.getLogger(HelmEntityImpl.class);
+
+ private FunctionFeed serviceUpFeed;
+
@Override
public void init() {
super.init();
@@ -66,7 +73,7 @@ public class HelmEntityImpl extends AbstractEntity implements HelmEntity {
private void connectServiceUpIsRunning() {
Duration period = Duration.FIVE_SECONDS;
- FunctionFeed.builder()
+ serviceUpFeed = FunctionFeed.builder()
.entity(this)
.period(period)
.poll(new FunctionPollConfig<Boolean, Boolean>(Attributes.SERVICE_UP)
@@ -83,6 +90,12 @@ public class HelmEntityImpl extends AbstractEntity implements HelmEntity {
private void addKubernetesFeeds() {
Callable status = getKubeDeploymentsCallable();
+// FunctionSensor<Integer> initializer = new FunctionSensor<Integer>(ConfigBag.newInstance()
+// .configure(FunctionSensor.SENSOR_PERIOD, Duration.millis(1000))
+// .configure(FunctionSensor.SENSOR_NAME, DEPLOYMENT_READY.getName())
+// .configure(FunctionSensor.SENSOR_TYPE, Boolean.class.getName())
+// .configure(FunctionSensor.FUNCTION, status));
+// initializer.apply(this);
addFeed(FunctionFeed.builder()
.entity(this)
@@ -99,7 +112,7 @@ public class HelmEntityImpl extends AbstractEntity implements HelmEntity {
.period(Duration.FIVE_SECONDS)
.build());
- Callable availableReplicas = getKubeReplicasCallable();
+ Callable availableReplicas = getKubeReplicasAvailableCallable();
addFeed(FunctionFeed.builder()
.entity(this)
.poll(new FunctionPollConfig<String, Integer>(AVAILABLE_REPLICAS)
@@ -128,16 +141,15 @@ public class HelmEntityImpl extends AbstractEntity implements HelmEntity {
}
private void disconnectServiceUpIsRunning() {
- // TODO
+ serviceUpFeed.stop();
}
@Override
- public Integer resize(Integer desiredSize) {
- scaleDeployment(desiredSize);
+ public Integer resize(String deploymentName, Integer desiredSize) {
+ scaleDeployment(desiredSize, deploymentName);
return desiredSize;
}
- @Override
public Integer getCurrentSize() {
return sensors().get(REPLICAS);
}
@@ -165,7 +177,7 @@ public class HelmEntityImpl extends AbstractEntity implements HelmEntity {
@Override
public void run() {
ImmutableList<String> installHelmTemplateCommand =
- ImmutableList.<String>of(buildCommand(repo_name, repo_url, install_values, namespace));
+ ImmutableList.<String>of(buildAddRepoCommand(repo_name, repo_url));
OutputStream out = new ByteArrayOutputStream();
OutputStream err = new ByteArrayOutputStream();
ProcessTool.execProcesses(installHelmTemplateCommand, null, null, out, err, ";", false, this);
@@ -176,14 +188,18 @@ public class HelmEntityImpl extends AbstractEntity implements HelmEntity {
@Override
public void run() {
ImmutableList<String> installHelmTemplateCommand =
- ImmutableList.<String>of(String.format("helm install %s %s", helm_deployment_name, helm_template));
- OutputStream out = new ByteArrayOutputStream();
- OutputStream err = new ByteArrayOutputStream();
+ ImmutableList.<String>of(buildInstallCommand(helm_deployment_name, helm_template, install_values, namespace));
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream();
+
ProcessTool.execProcesses(installHelmTemplateCommand, null, null, out, err, ";", false, this);
+
+ Tasks.addTagDynamically(BrooklynTaskTags.tagForStreamSoft(BrooklynTaskTags.STREAM_STDOUT, out));
+ Tasks.addTagDynamically(BrooklynTaskTags.tagForStreamSoft(BrooklynTaskTags.STREAM_STDERR, err));
}});
- //TODO Do something with output
}
+
private String getNamespace() {
return getLocation().getConfig(KubernetesLocation.NAMESPACE);
}
@@ -216,17 +232,20 @@ public class HelmEntityImpl extends AbstractEntity implements HelmEntity {
public boolean isRunning() {
String helm_name_install_name = getConfig(HelmEntity.HELM_DEPLOYMENT_NAME);
- ImmutableList<String> command = ImmutableList.<String>of(String.format("helm status %s", helm_name_install_name));
+ String namespace = getNamespace();
+ ImmutableList<String> command = ImmutableList.<String>of(String.format("helm status %s --namespace %s", helm_name_install_name, namespace));
OutputStream out = new ByteArrayOutputStream();
OutputStream err = new ByteArrayOutputStream();
- return 0 == ProcessTool.execProcesses(command, null, null, out, err,";",false, this);
+ int exectionResponse = ProcessTool.execProcesses(command, null, null, out, err, ";", false, this);
+ return 0 == exectionResponse;
}
public Callable<String> getCallable(String command) {
String helm_name_install_name = getConfig(HelmEntity.HELM_DEPLOYMENT_NAME);
+ String namespace = getNamespace();
ImmutableList<String> installHelmTemplateCommand =
- ImmutableList.<String>of(String.format("helm %s %s", command, helm_name_install_name));
+ ImmutableList.<String>of(String.format("helm %s %s --namespace %s", command, helm_name_install_name, namespace));
return new Callable<String>() {
@Override
@@ -245,57 +264,72 @@ public class HelmEntityImpl extends AbstractEntity implements HelmEntity {
return new Callable() {
@Override
- public Object call() throws Exception {
+ public Boolean call() throws Exception {
KubernetesClient client = getClient(config);
- Deployment deploy = client.apps().deployments().inNamespace(getNamespace()).withName(helm_name_install_name).get();
- FilterWatchListDeletable<Deployment, DeploymentList, Boolean, Watch, Watcher<Deployment>> release = client.apps().deployments().inNamespace(getNamespace()).withLabel("release", helm_name_install_name);
- DeploymentList list = release.list();
- List<Deployment> deployments = list.getItems();
- Integer availableReplicas = deployments.stream().map(deployment -> deployment.getStatus().getAvailableReplicas()).collect(Collectors.summingInt(Integer::intValue));
- Integer replicas = deployments.stream().map(deployment -> deployment.getStatus().getReplicas()).collect(Collectors.summingInt(Integer::intValue));
+ List<Deployment> deployments = getDeployments(client, helm_name_install_name);
+ Integer availableReplicas = countAvailableReplicas(deployments);
+ Integer replicas = countReplicas(deployments);
return availableReplicas.equals(replicas);
} ;
};
}
+ private List<Deployment> getDeployments(KubernetesClient client, String helm_name_install_name) {
+ AppsAPIGroupDSL apps = client.apps();
+ MixedOperation<Deployment, DeploymentList, DoneableDeployment, RollableScalableResource<Deployment, DoneableDeployment>> deployments1 = apps.deployments();
+ String namespace = getNamespace();
+ NonNamespaceOperation<Deployment, DeploymentList, DoneableDeployment, RollableScalableResource<Deployment, DoneableDeployment>> deploymentDeploymentListDoneableDeploymentRollableScalableResourceNonNamespaceOperation = deployments1.inNamespace(namespace);
+ FilterWatchListDeletable<Deployment, DeploymentList, Boolean, Watch, Watcher<Deployment>> release1 = deploymentDeploymentListDoneableDeploymentRollableScalableResourceNonNamespaceOperation.withLabel("app.kubernetes.io/instance", helm_name_install_name);
+ FilterWatchListDeletable<Deployment, DeploymentList, Boolean, Watch, Watcher<Deployment>> release = release1;
+ DeploymentList list = release.list();
+ List<Deployment> deployments = list.getItems();
+ return client.apps().deployments().inNamespace(getNamespace()).withLabel("app.kubernetes.io/instance", helm_name_install_name).list().getItems();
+ }
+
+ private Integer countReplicas(List<Deployment> deployments) {
+ return deployments.stream().map(deployment -> deployment.getStatus().getReplicas()).mapToInt(Integer::intValue).sum();
+ }
+
+ private Integer countAvailableReplicas(List<Deployment> deployments) {
+ return deployments.stream().map(deployment -> deployment.getStatus().getAvailableReplicas()).mapToInt(Integer::intValue).sum();
+ }
+
private KubernetesLocation getLocation() {
return (KubernetesLocation) getLocations().stream().filter(KubernetesLocation.class::isInstance).findFirst().get();
}
+ //TODO get rid of this
public Callable getKubeReplicasCallable() {
String helm_name_install_name = getConfig(HelmEntity.HELM_DEPLOYMENT_NAME);
String config = getLocation().getConfig(KubernetesLocation.KUBECONFIG);
return new Callable() {
@Override
- public Object call() throws Exception {
+ public Integer call() throws Exception {
KubernetesClient client = getClient(config);
- Deployment deploy = client.apps().deployments().inNamespace(getNamespace()).withName(helm_name_install_name).get();
- return deploy.getStatus().getReplicas();
+ return countReplicas(getDeployments(client, helm_name_install_name));
} ;
};
}
+ //TODO get rid of this
public Callable getKubeReplicasAvailableCallable() {
String helm_name_install_name = getConfig(HelmEntity.HELM_DEPLOYMENT_NAME);
String config = getLocation().getConfig(KubernetesLocation.KUBECONFIG);
return new Callable() {
@Override
- public Object call() throws Exception {
+ public Integer call() throws Exception {
KubernetesClient client = getClient(config);
- Deployment deploy = client.apps().deployments().inNamespace(getNamespace()).withName(helm_name_install_name).get();
- return deploy.getStatus().getAvailableReplicas();
+ return countAvailableReplicas(getDeployments(client, helm_name_install_name));
} ;
};
}
- public void scaleDeployment(Integer scale) {
- String helm_name_install_name = getConfig(HelmEntity.HELM_DEPLOYMENT_NAME);
-
+ public void scaleDeployment(Integer scale, String deploymentName) {
String config = getLocation().getConfig(KubernetesLocation.KUBECONFIG);
KubernetesClient client = getClient(config);
- client.apps().deployments().inNamespace(getNamespace()).withName(helm_name_install_name).scale(scale);
+ client.apps().deployments().inNamespace(getNamespace()).withName(deploymentName).scale(scale);
}
KubernetesClient getClient(String configFile) {
@@ -310,10 +344,16 @@ public class HelmEntityImpl extends AbstractEntity implements HelmEntity {
}
}
- private String buildCommand(String repo_name, String repo_url, String install_values, String namespace) {
+ private String buildAddRepoCommand(String repo_name, String repo_url) {
String install_command = String.format("helm repo add %s %s", repo_name, repo_url);
- if(Strings.isNonBlank(install_values)) {
- install_command += String.format(" --values %s", install_values);
+ return install_command;
+ }
+
+ private String buildInstallCommand(String helmDeploymentName, String helmTemplate, String installValues, String namespace) {
+ String install_command = String.format("helm install %s %s", helmDeploymentName, helmTemplate);
+
+ if(Strings.isNonBlank(installValues)) {
+ install_command += String.format(" --values %s", installValues);
}
if(Strings.isNonBlank(namespace)) {
install_command += String.format(" --namespace %s", namespace);
diff --git a/locations/container/src/test/java/org/apache/brooklyn/container/entity/helm/HelmEntityLiveTest.java b/locations/container/src/test/java/org/apache/brooklyn/container/entity/helm/HelmEntityLiveTest.java
index 806a267..d819547 100644
--- a/locations/container/src/test/java/org/apache/brooklyn/container/entity/helm/HelmEntityLiveTest.java
+++ b/locations/container/src/test/java/org/apache/brooklyn/container/entity/helm/HelmEntityLiveTest.java
@@ -26,7 +26,9 @@ import org.apache.brooklyn.api.location.Location;
import org.apache.brooklyn.container.location.kubernetes.KubernetesLocation;
import org.apache.brooklyn.core.entity.Attributes;
import org.apache.brooklyn.core.test.BrooklynAppLiveTestSupport;
+import org.apache.brooklyn.test.Asserts;
import org.apache.brooklyn.util.collections.MutableMap;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
import javax.annotation.Nullable;
@@ -38,6 +40,13 @@ import static org.apache.brooklyn.core.entity.EntityAsserts.assertPredicateEvent
public class HelmEntityLiveTest extends BrooklynAppLiveTestSupport {
+ @AfterMethod(alwaysRun = true, timeOut = Asserts.THIRTY_SECONDS_TIMEOUT_MS)
+ @Override
+ public void tearDown() throws Exception {
+ app.stop();
+ super.tearDown();
+ }
+
@Test
public void testSimpleDeploy() throws Exception {
HelmEntity andManageChild = newHelmSpec("nginx-test", "bitnami/nginx");
@@ -45,7 +54,7 @@ public class HelmEntityLiveTest extends BrooklynAppLiveTestSupport {
app.start(newKubernetesLocation());
assertAttributeEqualsEventually(andManageChild, Attributes.SERVICE_UP, true);
- app.stop();
+ assertAttributeEqualsEventually(andManageChild, HelmEntity.DEPLOYMENT_READY, true);
}
@@ -62,7 +71,6 @@ public class HelmEntityLiveTest extends BrooklynAppLiveTestSupport {
return status == null? false : status.contains("STATUS: deployed");
}
});
- app.stop();
}
@Test
@@ -72,7 +80,6 @@ public class HelmEntityLiveTest extends BrooklynAppLiveTestSupport {
app.start(newKubernetesLocation());
assertAttributeEqualsEventually(andManageChild, HelmEntity.DEPLOYMENT_READY, true);
- app.stop();
}
@Test
@@ -84,15 +91,12 @@ public class HelmEntityLiveTest extends BrooklynAppLiveTestSupport {
assertAttributeEqualsEventually(andManageChild, HelmEntity.AVAILABLE_REPLICAS, 1);
assertAttributeEqualsEventually(andManageChild, HelmEntity.REPLICAS, 1);
- andManageChild.resize(2);
+ andManageChild.resize("nginx-test",3);
- assertAttributeEqualsEventually(andManageChild, HelmEntity.AVAILABLE_REPLICAS, 2);
- assertAttributeEqualsEventually(andManageChild, HelmEntity.REPLICAS, 2);
+ assertAttributeEqualsEventually(andManageChild, HelmEntity.AVAILABLE_REPLICAS, 3);
+ assertAttributeEqualsEventually(andManageChild, HelmEntity.REPLICAS, 3);
assertAttributeEqualsEventually(andManageChild, HelmEntity.DEPLOYMENT_READY, true);
-
- app.stop();
-
}
@Test
@@ -104,15 +108,12 @@ public class HelmEntityLiveTest extends BrooklynAppLiveTestSupport {
assertAttributeEqualsEventually(andManageChild, HelmEntity.AVAILABLE_REPLICAS, 1);
assertAttributeEqualsEventually(andManageChild, HelmEntity.REPLICAS, 1);
- andManageChild.resize(2);
+ andManageChild.resize("prometheus-server", 2);
assertAttributeEqualsEventually(andManageChild, HelmEntity.AVAILABLE_REPLICAS, 2);
assertAttributeEqualsEventually(andManageChild, HelmEntity.REPLICAS, 2);
assertAttributeEqualsEventually(andManageChild, HelmEntity.DEPLOYMENT_READY, true);
-
- app.stop();
-
}
private HelmEntity newHelmSpec(String templateInstallName, String helmTemplate) {