You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by ka...@apache.org on 2018/04/07 15:58:58 UTC
[incubator-heron] branch master updated: expose heron metrics as a
service in Nomad (#2859)
This is an automated email from the ASF dual-hosted git repository.
karthikz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 2a4c1c1 expose heron metrics as a service in Nomad (#2859)
2a4c1c1 is described below
commit 2a4c1c1613bc2d10851503fcc35c571618a28ec1
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sat Apr 7 08:58:54 2018 -0700
expose heron metrics as a service in Nomad (#2859)
---
heron/config/src/yaml/conf/nomad/heron_nomad.sh | 3 +
.../config/src/yaml/conf/nomad/metrics_sinks.yaml | 3 +-
heron/config/src/yaml/conf/nomad/scheduler.yaml | 17 ++++
.../config/src/yaml/conf/standalone/heron_nomad.sh | 3 +
.../src/yaml/conf/standalone/metrics_sinks.yaml | 3 +-
.../config/src/yaml/conf/standalone/scheduler.yaml | 14 +++
.../standalone/templates/scheduler.template.yaml | 14 +++
.../heron/metricsmgr/sink/AbstractWebSink.java | 7 +-
heron/schedulers/src/java/BUILD | 1 +
.../heron/scheduler/nomad/NomadConstants.java | 5 +
.../heron/scheduler/nomad/NomadContext.java | 40 +++++++-
.../heron/scheduler/nomad/NomadScheduler.java | 95 ++++++++++++++++-
.../heron/scheduler/nomad/NomadSchedulerTest.java | 113 ++++++++++++++++++++-
13 files changed, 305 insertions(+), 13 deletions(-)
diff --git a/heron/config/src/yaml/conf/nomad/heron_nomad.sh b/heron/config/src/yaml/conf/nomad/heron_nomad.sh
index 51ca593..826ba0d 100644
--- a/heron/config/src/yaml/conf/nomad/heron_nomad.sh
+++ b/heron/config/src/yaml/conf/nomad/heron_nomad.sh
@@ -25,6 +25,9 @@ fi
# download and extract heron topology package
${HERON_TOPOLOGY_DOWNLOAD_CMD}
+# set metrics port file
+echo ${NOMAD_PORT_metrics_port} > ${METRICS_PORT_FILE}
+
# launch heron executor
trap 'kill -TERM $PID' TERM INT
${HERON_EXECUTOR_CMD} &
diff --git a/heron/config/src/yaml/conf/nomad/metrics_sinks.yaml b/heron/config/src/yaml/conf/nomad/metrics_sinks.yaml
index b5257e2..a70f43a 100644
--- a/heron/config/src/yaml/conf/nomad/metrics_sinks.yaml
+++ b/heron/config/src/yaml/conf/nomad/metrics_sinks.yaml
@@ -60,7 +60,8 @@ tmaster-sink:
prometheus-sink:
class: "com.twitter.heron.metricsmgr.sink.PrometheusSink"
- port: 8080 # The port on which to run (either port or port-file are mandatory)
+# port: 8080 # The port on which to run (either port or port-file are mandatory)
+ port-file: "port_file"
path: /metrics # The path on which to publish the metrics (mandatory)
flat-metrics: true # By default the web-sink will publish a flat "name -> value" json map
include-topology-name: true # Include topology name in metric name (default false)
diff --git a/heron/config/src/yaml/conf/nomad/scheduler.yaml b/heron/config/src/yaml/conf/nomad/scheduler.yaml
index 297afc4..21d4268 100644
--- a/heron/config/src/yaml/conf/nomad/scheduler.yaml
+++ b/heron/config/src/yaml/conf/nomad/scheduler.yaml
@@ -26,3 +26,20 @@ heron.nomad.driver: "docker"
# The docker image to use for heron if the docker driver is used,
heron.executor.docker.image: 'heron/heron:latest'
+
+# Set networking mode networking when driver is docker
+heron.nomad.network.mode: "default"
+
+# whether to register metrics service endpoints for prometheus metrics sink in consul
+# the service will be named in the format: metrics-heron-<topology-name>-<container-index>
+heron.nomad.metrics.service.register: True
+
+# interval at which health checks should be conducted for metrics service endpoint
+heron.nomad.metrics.service.check.interval.sec: 10
+
+# timeout of metrics service endpoint health check
+heron.nomad.metrics.service.check.timeout.sec: 2
+
+# additional tags to be attached to metrics service
+# A tag of <topology-name>-<container-index> with be automaticallu attached
+heron.nomad.metrics.service.additional.tags: "prometheus,metrics,heron"
\ No newline at end of file
diff --git a/heron/config/src/yaml/conf/standalone/heron_nomad.sh b/heron/config/src/yaml/conf/standalone/heron_nomad.sh
index 51ca593..826ba0d 100644
--- a/heron/config/src/yaml/conf/standalone/heron_nomad.sh
+++ b/heron/config/src/yaml/conf/standalone/heron_nomad.sh
@@ -25,6 +25,9 @@ fi
# download and extract heron topology package
${HERON_TOPOLOGY_DOWNLOAD_CMD}
+# set metrics port file
+echo ${NOMAD_PORT_metrics_port} > ${METRICS_PORT_FILE}
+
# launch heron executor
trap 'kill -TERM $PID' TERM INT
${HERON_EXECUTOR_CMD} &
diff --git a/heron/config/src/yaml/conf/standalone/metrics_sinks.yaml b/heron/config/src/yaml/conf/standalone/metrics_sinks.yaml
index b5257e2..a70f43a 100644
--- a/heron/config/src/yaml/conf/standalone/metrics_sinks.yaml
+++ b/heron/config/src/yaml/conf/standalone/metrics_sinks.yaml
@@ -60,7 +60,8 @@ tmaster-sink:
prometheus-sink:
class: "com.twitter.heron.metricsmgr.sink.PrometheusSink"
- port: 8080 # The port on which to run (either port or port-file are mandatory)
+# port: 8080 # The port on which to run (either port or port-file are mandatory)
+ port-file: "port_file"
path: /metrics # The path on which to publish the metrics (mandatory)
flat-metrics: true # By default the web-sink will publish a flat "name -> value" json map
include-topology-name: true # Include topology name in metric name (default false)
diff --git a/heron/config/src/yaml/conf/standalone/scheduler.yaml b/heron/config/src/yaml/conf/standalone/scheduler.yaml
index 48f512f..ff34902 100644
--- a/heron/config/src/yaml/conf/standalone/scheduler.yaml
+++ b/heron/config/src/yaml/conf/standalone/scheduler.yaml
@@ -22,3 +22,17 @@ heron.nomad.core.freq.mapping: 2000
# standalone mode uses the raw_exec driver
heron.nomad.driver: "raw_exec"
+
+# whether to register metrics service endpoints for prometheus metrics sink in consul
+# the service will be named in the format: metrics-heron-<topology-name>-<container-index>
+heron.nomad.metrics.service.register: False
+
+# interval at which health checks should be conducted for metrics service endpoint
+heron.nomad.metrics.service.check.interval.sec: 10
+
+# timeout of metrics service endpoint health check
+heron.nomad.metrics.service.check.timeout.sec: 2
+
+# additional tags to be attached to metrics service in a comma delimited list
+# A tag of <topology-name>-<container-index> with be automaticallu attached
+heron.nomad.metrics.service.additional.tags: "prometheus,metrics,heron"
diff --git a/heron/config/src/yaml/conf/standalone/templates/scheduler.template.yaml b/heron/config/src/yaml/conf/standalone/templates/scheduler.template.yaml
index 443bb5b..0a1becb 100644
--- a/heron/config/src/yaml/conf/standalone/templates/scheduler.template.yaml
+++ b/heron/config/src/yaml/conf/standalone/templates/scheduler.template.yaml
@@ -22,3 +22,17 @@ heron.nomad.core.freq.mapping: 2000
# standalone mode uses the raw_exec driver
heron.nomad.driver: "raw_exec"
+
+# whether to register metrics service endpoints for prometheus metrics sink in consul
+# the service will be named in the format: metrics-heron-<topology-name>-<container-index>
+heron.nomad.metrics.service.register: False
+
+# interval at which health checks should be conducted for metrics service endpoint
+heron.nomad.metrics.service.check.interval.sec: 10
+
+# timeout of metrics service endpoint health check
+heron.nomad.metrics.service.check.timeout.sec: 2
+
+# additional tags to be attached to metrics service
+# A tag of <topology-name>-<container-index> with be automaticallu attached
+heron.nomad.metrics.service.additional.tags: "prometheus,metrics,heron"
diff --git a/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/sink/AbstractWebSink.java b/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/sink/AbstractWebSink.java
index 13449c6..fd64286 100644
--- a/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/sink/AbstractWebSink.java
+++ b/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/sink/AbstractWebSink.java
@@ -82,7 +82,7 @@ abstract class AbstractWebSink implements IMetricsSink {
@Override
public final void init(Map<String, Object> conf, SinkContext context) {
String path = (String) conf.get(KEY_PATH);
- String portFile = (String) conf.get(KEY_PORT_FILE);
+ String portFile = getServerPortFile(conf);
cacheMaxSize = TypeUtils.getLong(conf.getOrDefault(KEY_METRICS_CACHE_MAX_SIZE,
DEFAULT_MAX_CACHE_SIZE));
@@ -132,6 +132,7 @@ abstract class AbstractWebSink implements IMetricsSink {
os.close();
LOG.log(Level.INFO, "Received metrics request.");
});
+ LOG.info("Starting web sink server on port: " + port);
httpServer.start();
} catch (IOException e) {
throw new RuntimeException("Failed to create Http server on port " + port, e);
@@ -164,4 +165,8 @@ abstract class AbstractWebSink implements IMetricsSink {
httpServer.stop(0);
}
}
+
+ public static String getServerPortFile(Map<String, Object> conf) {
+ return (String) conf.get(KEY_PORT_FILE);
+ }
}
diff --git a/heron/schedulers/src/java/BUILD b/heron/schedulers/src/java/BUILD
index 24bdb3f..4a83f2f 100644
--- a/heron/schedulers/src/java/BUILD
+++ b/heron/schedulers/src/java/BUILD
@@ -69,6 +69,7 @@ nomad_sdk_deps = [
nomad_deps_files = \
scheduler_deps_files + nomad_sdk_deps + [
":scheduler-utils-java",
+ "//heron/metricsmgr/src/java:metricsmgr-java"
]
java_library(
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadConstants.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadConstants.java
index b82cf82..bfc4fe0 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadConstants.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadConstants.java
@@ -43,6 +43,7 @@ public final class NomadConstants {
public static final String JOB_LINK = "/ui/jobs";
public static final String HOST = "HOST";
+ public static final String NETWORK_MODE = "network_mode";
public static final String NOMAD_TASK_COMMAND = "command";
public static final String NOMAD_TASK_COMMAND_ARGS = "args";
@@ -51,6 +52,7 @@ public final class NomadConstants {
public static final String NOMAD_DEFAULT_DATACENTER = "dc1";
public static final String SHELL_CMD = "/bin/sh";
public static final String NOMAD_HERON_SCRIPT_NAME = "run_heron_executor.sh";
+ public static final String NOMAD_SERVICE_CHECK_TYPE = "tcp";
public static final String HERON_NOMAD_WORKING_DIR = "HERON_NOMAD_WORKING_DIR";
public static final String HERON_USE_CORE_PACKAGE_URI = "HERON_USE_CORE_PACKAGE_URI";
@@ -88,6 +90,9 @@ public final class NomadConstants {
// port number the start with when more than one port needed for remote debugging
public static final String JVM_REMOTE_DEBUGGER_PORT = String.format("${NOMAD_PORT_%s}",
SchedulerUtils.ExecutorPort.JVM_REMOTE_DEBUGGER_PORTS.getName());
+ // port for metrics webserver (AbstractWebSink)
+ public static final String METRICS_PORT = "metrics_port";
+ public static final String METRICS_PORT_FILE = "METRICS_PORT_FILE";
public static final Map<SchedulerUtils.ExecutorPort, String> EXECUTOR_PORTS = new HashMap<>();
static {
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadContext.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadContext.java
index c665559..2dd7f5f 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadContext.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadContext.java
@@ -30,6 +30,20 @@ public class NomadContext extends Context {
public static final String HERON_EXECUTOR_DOCKER_IMAGE = "heron.executor.docker.image";
+ public static final String HERON_NOMAD_NETWORK_MODE = "heron.nomad.network.mode";
+
+ public static final String HERON_NOMAD_METRICS_SERVICE_REGISTER
+ = "heron.nomad.metrics.service.register";
+
+ public static final String HERON_NOMAD_METRICS_SERVICE_CHECK_INTERVAL_SEC
+ = "heron.nomad.metrics.service.check.interval.sec";
+
+ public static final String HERON_NOMAD_METRICS_SERVICE_CHECK_TIMEOUT_SEC
+ = "heron.nomad.metrics.service.check.timeout.sec";
+
+ public static final String HERON_NOMAD_METRICS_SERVICE_ADDITIONAL_TAGS
+ = "heron.nomad.metrics.service.additional.tags";
+
public static String workingDirectory(Config config) {
return config.getStringValue(
NomadKey.WORKING_DIRECTORY.value(), NomadKey.WORKING_DIRECTORY.getDefaultString());
@@ -42,7 +56,7 @@ public class NomadContext extends Context {
}
public static String getSchedulerURI(Config config) {
- return config.getStringValue(HERON_NOMAD_SCHEDULER_URI);
+ return config.getStringValue(HERON_NOMAD_SCHEDULER_URI, "http://127.0.0.1:4646");
}
public static int getCoreFreqMapping(Config config) {
@@ -50,10 +64,30 @@ public class NomadContext extends Context {
}
public static String getHeronNomadDriver(Config config) {
- return config.getStringValue(HERON_NOMAD_DRIVER);
+ return config.getStringValue(HERON_NOMAD_DRIVER, "docker");
}
public static String getHeronExecutorDockerImage(Config config) {
- return config.getStringValue(HERON_EXECUTOR_DOCKER_IMAGE);
+ return config.getStringValue(HERON_EXECUTOR_DOCKER_IMAGE, "heron/heron:latest");
+ }
+
+ public static boolean getHeronNomadMetricsServiceRegister(Config config) {
+ return config.getBooleanValue(HERON_NOMAD_METRICS_SERVICE_REGISTER, false);
+ }
+
+ public static int getHeronNomadMetricsServiceCheckIntervalSec(Config config) {
+ return config.getIntegerValue(HERON_NOMAD_METRICS_SERVICE_CHECK_INTERVAL_SEC, 10);
+ }
+
+ public static int getHeronNomadMetricsServiceCheckTimeoutSec(Config config) {
+ return config.getIntegerValue(HERON_NOMAD_METRICS_SERVICE_CHECK_TIMEOUT_SEC, 2);
+ }
+
+ public static String[] getHeronNomadMetricsServiceAdditionalTags(Config config) {
+ return config.getStringValue(HERON_NOMAD_METRICS_SERVICE_ADDITIONAL_TAGS, "").split(",");
+ }
+
+ public static String getHeronNomadNetworkMode(Config config) {
+ return config.getStringValue(HERON_NOMAD_NETWORK_MODE, "default");
}
}
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadScheduler.java
index 7531f23..2b76505 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadScheduler.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/nomad/NomadScheduler.java
@@ -13,6 +13,7 @@
// limitations under the License.
package com.twitter.heron.scheduler.nomad;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
@@ -23,6 +24,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
@@ -33,6 +35,8 @@ import com.hashicorp.nomad.apimodel.JobListStub;
import com.hashicorp.nomad.apimodel.NetworkResource;
import com.hashicorp.nomad.apimodel.Port;
import com.hashicorp.nomad.apimodel.Resources;
+import com.hashicorp.nomad.apimodel.Service;
+import com.hashicorp.nomad.apimodel.ServiceCheck;
import com.hashicorp.nomad.apimodel.Task;
import com.hashicorp.nomad.apimodel.TaskGroup;
import com.hashicorp.nomad.apimodel.Template;
@@ -42,6 +46,8 @@ import com.hashicorp.nomad.javasdk.NomadApiConfiguration;
import com.hashicorp.nomad.javasdk.NomadException;
import com.hashicorp.nomad.javasdk.ServerQueryResponse;
+import com.twitter.heron.metricsmgr.MetricsSinksConfig;
+import com.twitter.heron.metricsmgr.sink.PrometheusSink;
import com.twitter.heron.proto.scheduler.Scheduler;
import com.twitter.heron.scheduler.UpdateTopologyManager;
import com.twitter.heron.scheduler.utils.Runtime;
@@ -53,6 +59,8 @@ import com.twitter.heron.spi.packing.PackingPlan;
import com.twitter.heron.spi.packing.Resource;
import com.twitter.heron.spi.scheduler.IScheduler;
+import static com.twitter.heron.scheduler.nomad.NomadConstants.METRICS_PORT;
+
@SuppressWarnings("IllegalCatch")
public class NomadScheduler implements IScheduler {
@@ -256,7 +264,8 @@ public class NomadScheduler implements IScheduler {
i++;
}
- resourceReqs.addNetworks(new NetworkResource().addDynamicPorts(ports));
+ NetworkResource networkResource = new NetworkResource();
+ networkResource.addDynamicPorts(ports);
// set memory requirements
long memoryReqMb = containerResource.getRam().asMegabytes();
@@ -270,8 +279,44 @@ public class NomadScheduler implements IScheduler {
// set disk requirements
long diskReqMb = containerResource.getDisk().asMegabytes();
resourceReqs.setDiskMb(longToInt(diskReqMb));
- task.setResources(resourceReqs);
+ // allocate dynamic port for prometheus/websink metrics
+ String prometheusPortFile = getPrometheusMetricsFile(this.localConfig);
+ if (prometheusPortFile == null) {
+ LOG.severe("Failed to find port file for Prometheus metrics. "
+ + "Please check metrics sinks configurations");
+ } else {
+ networkResource.addDynamicPorts(new Port().setLabel(METRICS_PORT));
+ task.addEnv(NomadConstants.METRICS_PORT_FILE, prometheusPortFile);
+
+ if (NomadContext.getHeronNomadMetricsServiceRegister(this.localConfig)) {
+ // getting tags for service
+ List<String> tags = new LinkedList<>();
+ tags.add(String.format("%s-%s",
+ Runtime.topologyName(this.runtimeConfig), containerIndex));
+ tags.addAll(Arrays.asList(
+ NomadContext.getHeronNomadMetricsServiceAdditionalTags(this.localConfig)));
+ //register metrics service with consul
+ Service service = new Service()
+ .setName(
+ getMetricsServiceName(Runtime.topologyName(this.runtimeConfig), containerIndex))
+ .setPortLabel(METRICS_PORT)
+ .setTags(tags)
+ .addChecks(new ServiceCheck().setType(NomadConstants.NOMAD_SERVICE_CHECK_TYPE)
+ .setPortLabel(METRICS_PORT)
+ .setInterval(TimeUnit.NANOSECONDS.convert(
+ NomadContext.getHeronNomadMetricsServiceCheckIntervalSec(this.localConfig),
+ TimeUnit.SECONDS))
+ .setTimeout(TimeUnit.NANOSECONDS.convert(
+ NomadContext.getHeronNomadMetricsServiceCheckTimeoutSec(this.localConfig),
+ TimeUnit.SECONDS)));
+
+ task.addServices(service);
+ }
+ }
+
+ resourceReqs.addNetworks(networkResource);
+ task.setResources(resourceReqs);
return task;
}
@@ -301,7 +346,13 @@ public class NomadScheduler implements IScheduler {
NomadContext.getHeronExecutorDockerImage(this.localConfig));
task.addConfig(NomadConstants.NOMAD_TASK_COMMAND, NomadConstants.SHELL_CMD);
- String[] args = {"-c", String.format("%s && %s", topologyDownloadCmd, executorCmd)};
+ task.addConfig(NomadConstants.NETWORK_MODE,
+ NomadContext.getHeronNomadNetworkMode(this.localConfig));
+
+ String setMetricsPortFileCmd = getSetMetricsPortFileCmd();
+
+ String[] args = {"-c", String.format("%s && %s && %s",
+ topologyDownloadCmd, setMetricsPortFileCmd, executorCmd)};
task.addConfig(NomadConstants.NOMAD_TASK_COMMAND_ARGS, args);
@@ -341,7 +392,6 @@ public class NomadScheduler implements IScheduler {
template.setDestPath(NomadConstants.NOMAD_HERON_SCRIPT_NAME);
task.addTemplates(template);
- Resources resourceReqs = new Resources();
// configure nomad to allocate dynamic ports
Port[] ports = new Port[NomadConstants.EXECUTOR_PORTS.size()];
int i = 0;
@@ -517,4 +567,41 @@ public class NomadScheduler implements IScheduler {
Resource getHomogeneousContainerResource(PackingPlan homogeneousPackingPlan) {
return homogeneousPackingPlan.getContainers().iterator().next().getRequiredResource();
}
+
+ static String getPrometheusMetricsFile(Config config) {
+ MetricsSinksConfig metricsSinksConfig;
+ try {
+ metricsSinksConfig = new MetricsSinksConfig(Context.metricsSinksFile(config));
+ } catch (FileNotFoundException e) {
+ return null;
+ }
+
+ String prometheusSinkId = null;
+ Map<String, Object> prometheusSinkConfig = null;
+ for (String sinkId : metricsSinksConfig.getSinkIds()) {
+ Map<String, Object> sinkConfig = metricsSinksConfig.getConfigForSink(sinkId);
+ Object className = sinkConfig.get(MetricsSinksConfig.CONFIG_KEY_CLASSNAME);
+ if (className != null && className instanceof String) {
+ if (PrometheusSink.class.getName().equals(className)) {
+ prometheusSinkId = sinkId;
+ prometheusSinkConfig = sinkConfig;
+ }
+ }
+ }
+ if (prometheusSinkId == null || prometheusSinkConfig == null) {
+ return null;
+ }
+
+ String prometheusMetricsPortFile = PrometheusSink.getServerPortFile(prometheusSinkConfig);
+ return prometheusMetricsPortFile;
+ }
+
+ static String getMetricsServiceName(String topologyName, int containerIndex) {
+ return String.format("metrics-heron-%s-%s", topologyName, containerIndex);
+ }
+
+ static String getSetMetricsPortFileCmd() {
+ return String.format("echo ${NOMAD_PORT_%s} > ${%s}",
+ NomadConstants.METRICS_PORT, NomadConstants.METRICS_PORT_FILE);
+ }
}
diff --git a/heron/schedulers/tests/java/com/twitter/heron/scheduler/nomad/NomadSchedulerTest.java b/heron/schedulers/tests/java/com/twitter/heron/scheduler/nomad/NomadSchedulerTest.java
index d6bba66..c14d65d 100644
--- a/heron/schedulers/tests/java/com/twitter/heron/scheduler/nomad/NomadSchedulerTest.java
+++ b/heron/schedulers/tests/java/com/twitter/heron/scheduler/nomad/NomadSchedulerTest.java
@@ -18,10 +18,12 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import com.google.common.base.Optional;
import com.hashicorp.nomad.apimodel.Job;
+import com.hashicorp.nomad.apimodel.Port;
import com.hashicorp.nomad.apimodel.Task;
import com.hashicorp.nomad.apimodel.TaskGroup;
import com.hashicorp.nomad.javasdk.NomadApiClient;
@@ -73,6 +75,7 @@ public class NomadSchedulerTest {
private static final String CORE_PACKAGE_URI = "core-package-uri";
private static final Boolean USE_CORE_PACKAGE_URI = true;
private static final String EXECUTOR_BINARY = "executor-binary";
+ private static final String PORT_FILE = "port-file";
private static NomadScheduler scheduler;
@@ -91,6 +94,7 @@ public class NomadSchedulerTest {
.put(Key.USE_CORE_PACKAGE_URI, USE_CORE_PACKAGE_URI)
.put(Key.EXECUTOR_BINARY, EXECUTOR_BINARY)
.put(NomadContext.HERON_NOMAD_DRIVER, NomadConstants.NomadDriver.RAW_EXEC.getName())
+ .put(NomadContext.HERON_NOMAD_NETWORK_MODE, "default")
.build();
this.mockRuntime = config;
@@ -322,6 +326,8 @@ public class NomadSchedulerTest {
.thenReturn((int) MEMORY_RESOURCE.asMegabytes());
PowerMockito.when(NomadScheduler.longToInt(DISK_RESOURCE.asMegabytes()))
.thenReturn((int) DISK_RESOURCE.asMegabytes());
+ PowerMockito.when(NomadScheduler.getPrometheusMetricsFile(Mockito.any()))
+ .thenReturn(PORT_FILE);
scheduler.initialize(this.mockConfig, this.mockRuntime);
@@ -352,6 +358,7 @@ public class NomadSchedulerTest {
Assert.assertTrue(task.getEnv().containsKey(NomadConstants.HERON_CORE_PACKAGE_URI));
Assert.assertTrue(task.getEnv().containsKey(NomadConstants.HERON_TOPOLOGY_DOWNLOAD_CMD));
Assert.assertTrue(task.getEnv().containsKey(NomadConstants.HERON_EXECUTOR_CMD));
+ Assert.assertTrue(task.getEnv().containsKey(NomadConstants.METRICS_PORT_FILE));
Assert.assertEquals(NomadKey.WORKING_DIRECTORY.getDefaultString() + "/container-"
+ String.valueOf(CONTAINER_INDEX),
@@ -364,17 +371,19 @@ public class NomadSchedulerTest {
task.getEnv().get(NomadConstants.HERON_TOPOLOGY_DOWNLOAD_CMD));
Assert.assertEquals("./heron-core/bin/heron-executor args1 args2",
task.getEnv().get(NomadConstants.HERON_EXECUTOR_CMD));
+ Assert.assertEquals(PORT_FILE,
+ task.getEnv().get(NomadConstants.METRICS_PORT_FILE));
}
@SuppressWarnings("unchecked")
@Test
public void testGetTaskDocker() {
- this.mockRuntime = this.mockRuntime.newBuilder()
+ this.mockRuntime = this.mockRuntime.newBuilder().putAll(this.mockRuntime)
.put(NomadContext.HERON_NOMAD_DRIVER, NomadConstants.NomadDriver.DOCKER.getName())
.build();
- this.mockConfig = this.mockConfig.newBuilder()
+ this.mockConfig = this.mockConfig.newBuilder().putAll(this.mockConfig)
.put(NomadContext.HERON_NOMAD_DRIVER, NomadConstants.NomadDriver.DOCKER.getName())
.build();
@@ -398,6 +407,8 @@ public class NomadSchedulerTest {
.thenReturn((int) MEMORY_RESOURCE.asMegabytes());
PowerMockito.when(NomadScheduler.longToInt(DISK_RESOURCE.asMegabytes()))
.thenReturn((int) DISK_RESOURCE.asMegabytes());
+ PowerMockito.when(NomadScheduler.getPrometheusMetricsFile(Mockito.any()))
+ .thenReturn(PORT_FILE);
scheduler.initialize(this.mockConfig, this.mockRuntime);
@@ -409,6 +420,8 @@ public class NomadSchedulerTest {
Assert.assertTrue(task.getConfig().containsKey(NomadConstants.NOMAD_TASK_COMMAND));
Assert.assertEquals(NomadConstants.SHELL_CMD,
task.getConfig().get(NomadConstants.NOMAD_TASK_COMMAND));
+ Assert.assertTrue(task.getEnv().containsKey(NomadConstants.METRICS_PORT_FILE));
+
Assert.assertEquals((int) CPU_RESOURCE * HERON_NOMAD_CORE_FREQ_MAPPING,
task.getResources().getCpu().intValue());
@@ -417,8 +430,102 @@ public class NomadSchedulerTest {
Assert.assertEquals((int) DISK_RESOURCE.asMegabytes(),
task.getResources().getDiskMb().intValue());
Assert.assertTrue(task.getEnv().containsKey(NomadConstants.HOST));
-
+ Assert.assertEquals(PORT_FILE,
+ task.getEnv().get(NomadConstants.METRICS_PORT_FILE));
Assert.assertEquals("${attr.unique.network.ip-address}",
task.getEnv().get(NomadConstants.HOST));
+ Assert.assertTrue(task.getConfig().containsKey(NomadConstants.NETWORK_MODE));
+ Assert.assertEquals("default",
+ task.getConfig().get(NomadConstants.NETWORK_MODE));
+
+ Assert.assertEquals(task.getResources().getNetworks().size(), 1);
+
+ Set<String> ports = new HashSet<>();
+ for (Port entry : task.getResources().getNetworks().get(0).getDynamicPorts()) {
+ ports.add(entry.getLabel());
+ Assert.assertEquals(entry.getValue(), 0);
+ }
+
+ for (SchedulerUtils.ExecutorPort entry : NomadConstants.EXECUTOR_PORTS.keySet()) {
+ Assert.assertTrue(ports.contains(entry.getName().replace("-", "_")));
+ }
+ Assert.assertTrue(ports.contains(NomadConstants.METRICS_PORT));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testServiceCheck() {
+ this.mockConfig = this.mockConfig.newBuilder().putAll(this.mockConfig)
+ .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_ADDITIONAL_TAGS, "foo,bar")
+ .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_CHECK_TIMEOUT_SEC, "2")
+ .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_CHECK_INTERVAL_SEC, "10")
+ .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_REGISTER, true).build();
+
+ this.mockRuntime = this.mockRuntime.newBuilder().putAll(this.mockConfig)
+ .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_ADDITIONAL_TAGS, "foo,bar")
+ .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_CHECK_TIMEOUT_SEC, "2")
+ .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_CHECK_INTERVAL_SEC, "10")
+ .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_REGISTER, true).build();
+
+ Set<PackingPlan.ContainerPlan> containers = new HashSet<>();
+ containers.add(Mockito.mock(PackingPlan.ContainerPlan.class));
+
+ PowerMockito.mockStatic(SchedulerUtils.class);
+
+ Resource resource = new Resource(CPU_RESOURCE, MEMORY_RESOURCE, DISK_RESOURCE);
+
+ PowerMockito.when(SchedulerUtils.executorCommandArgs(
+ Mockito.any(), Mockito.any(), Mockito.anyMap(), Mockito.anyString()))
+ .thenReturn(EXECUTOR_CMD_ARGS);
+
+ PowerMockito.mockStatic(NomadScheduler.class);
+ PowerMockito.when(NomadScheduler.getFetchCommand(Mockito.any(), Mockito.any()))
+ .thenReturn(TOPOLOGY_DOWNLOAD_CMD);
+ PowerMockito.when(NomadScheduler.getHeronNomadScript(this.mockConfig))
+ .thenReturn(HERON_NOMAD_SCRIPT);
+ PowerMockito.when(NomadScheduler.longToInt(MEMORY_RESOURCE.asMegabytes()))
+ .thenReturn((int) MEMORY_RESOURCE.asMegabytes());
+ PowerMockito.when(NomadScheduler.longToInt(DISK_RESOURCE.asMegabytes()))
+ .thenReturn((int) DISK_RESOURCE.asMegabytes());
+ PowerMockito.when(NomadScheduler.getPrometheusMetricsFile(Mockito.any()))
+ .thenReturn(PORT_FILE);
+ PowerMockito.when(NomadScheduler.getMetricsServiceName(Mockito.any(), Mockito.anyInt()))
+ .thenReturn(String.format("metrics-heron-%s-%s", TOPOLOGY_NAME, CONTAINER_INDEX));
+
+ scheduler.initialize(this.mockConfig, this.mockRuntime);
+
+ Task task = scheduler.getTask(TASK_NAME, CONTAINER_INDEX, resource);
+ LOG.info("task: " + task);
+
+ Assert.assertEquals(task.getServices().size(), 1);
+ Assert.assertEquals(task.getServices().get(0).getName(),
+ String.format("metrics-heron-%s-%s", TOPOLOGY_NAME, CONTAINER_INDEX));
+
+ String[] tags = {String.format("%s-%s", TOPOLOGY_NAME, CONTAINER_INDEX), "foo", "bar"};
+ Assert.assertEquals(task.getServices().get(0).getTags(), Arrays.asList(tags));
+ Assert.assertEquals(task.getServices().get(0).getPortLabel(), NomadConstants.METRICS_PORT);
+ Assert.assertEquals(task.getServices().get(0).getChecks().size(), 1);
+ Assert.assertEquals(task.getServices().get(0).getChecks().get(0).getPortLabel(),
+ NomadConstants.METRICS_PORT);
+ Assert.assertEquals(task.getServices().get(0).getChecks().get(0).getType(),
+ NomadConstants.NOMAD_SERVICE_CHECK_TYPE);
+ TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS);
+ Assert.assertEquals(task.getServices().get(0).getChecks().get(0).getInterval(),
+ TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS));
+ Assert.assertEquals(task.getServices().get(0).getChecks().get(0).getTimeout(),
+ TimeUnit.NANOSECONDS.convert(2, TimeUnit.SECONDS));
+
+
+ // if service registration is turned off
+ this.mockConfig = this.mockConfig.newBuilder().putAll(this.mockConfig)
+ .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_REGISTER, false).build();
+
+ this.mockRuntime = this.mockRuntime.newBuilder().putAll(this.mockConfig)
+ .put(NomadContext.HERON_NOMAD_METRICS_SERVICE_REGISTER, false).build();
+ scheduler.initialize(this.mockConfig, this.mockRuntime);
+
+ task = scheduler.getTask(TASK_NAME, CONTAINER_INDEX, resource);
+ LOG.info("task: " + task);
+ Assert.assertTrue(task.getServices() == null);
}
}
--
To stop receiving notification emails like this one, please contact
karthikz@apache.org.