You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/01/20 17:15:26 UTC
[nifi] branch main updated: NIFI-9587 Added JSON format for Prometheus Flow Metrics
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new ccd47de NIFI-9587 Added JSON format for Prometheus Flow Metrics
ccd47de is described below
commit ccd47de6dca4f9be90bfdbeb193c81e85b953170
Author: Timea Barna <ti...@gmail.com>
AuthorDate: Wed Jan 19 08:35:09 2022 +0100
NIFI-9587 Added JSON format for Prometheus Flow Metrics
This closes #5673
Signed-off-by: David Handermann <ex...@apache.org>
---
.../java/org/apache/nifi/web/api/FlowResource.java | 17 +-
...r.java => AbstractPrometheusMetricsWriter.java} | 28 +--
...java => JsonFormatPrometheusMetricsWriter.java} | 56 +++--
.../metrics/TextFormatPrometheusMetricsWriter.java | 27 +--
.../nifi/web/api/request/FlowMetricsProducer.java | 3 +-
.../org/apache/nifi/web/api/TestFlowResource.java | 249 +++++++++++++++++++--
6 files changed, 287 insertions(+), 93 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
index 79a31d0..0431ebc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
@@ -111,6 +111,7 @@ import org.apache.nifi.web.api.entity.VersionedFlowEntity;
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataSetEntity;
import org.apache.nifi.web.api.entity.VersionedFlowsEntity;
+import org.apache.nifi.web.api.metrics.JsonFormatPrometheusMetricsWriter;
import org.apache.nifi.web.api.metrics.TextFormatPrometheusMetricsWriter;
import org.apache.nifi.web.api.metrics.PrometheusMetricsWriter;
import org.apache.nifi.web.api.request.BulletinBoardPatternParameter;
@@ -428,7 +429,11 @@ public class FlowResource extends ApplicationResource {
@ApiParam(
value = "Regular Expression Pattern to be applied against the sample label value field"
)
- @QueryParam("sampleLabelValue") final String sampleLabelValue
+ @QueryParam("sampleLabelValue") final String sampleLabelValue,
+ @ApiParam(
+ value = "Name of the first field of JSON object. Applicable for JSON producer only."
+ )
+ @QueryParam("rootFieldName") final String rootFieldName
) {
authorizeFlow();
@@ -442,6 +447,16 @@ public class FlowResource extends ApplicationResource {
prometheusMetricsWriter.write(registries, outputStream);
});
return generateOkResponse(response).type(TextFormat.CONTENT_TYPE_004).build();
+
+ } else if (FlowMetricsProducer.JSON.getProducer().equals(producer)) {
+ final StreamingOutput output = outputStream -> {
+ final JsonFormatPrometheusMetricsWriter jsonPrometheusMetricsWriter = new JsonFormatPrometheusMetricsWriter(sampleName, sampleLabelValue, rootFieldName);
+ jsonPrometheusMetricsWriter.write(registries, outputStream);
+ };
+ return generateOkResponse(output)
+ .type(MediaType.APPLICATION_JSON_TYPE)
+ .build();
+
} else {
throw new ResourceNotFoundException("The specified producer is missing or invalid.");
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/TextFormatPrometheusMetricsWriter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/AbstractPrometheusMetricsWriter.java
similarity index 63%
copy from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/TextFormatPrometheusMetricsWriter.java
copy to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/AbstractPrometheusMetricsWriter.java
index 9631fd4..aca66fd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/TextFormatPrometheusMetricsWriter.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/AbstractPrometheusMetricsWriter.java
@@ -18,29 +18,18 @@ package org.apache.nifi.web.api.metrics;
import io.prometheus.client.Collector;
import io.prometheus.client.CollectorRegistry;
-import io.prometheus.client.exporter.common.TextFormat;
import org.apache.commons.lang3.StringUtils;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.util.Collection;
import java.util.Enumeration;
import java.util.regex.Pattern;
-/**
- * Prometheus Metrics Writer supporting Prometheus Text Version 0.0.4 with optional filtering
- */
-public class TextFormatPrometheusMetricsWriter implements PrometheusMetricsWriter {
+public abstract class AbstractPrometheusMetricsWriter implements PrometheusMetricsWriter {
private final Pattern sampleNamePattern;
private final Pattern sampleLabelValuePattern;
private final boolean filteringDisabled;
- public TextFormatPrometheusMetricsWriter(
+ public AbstractPrometheusMetricsWriter(
final String sampleName,
final String sampleLabelValue
) {
@@ -49,18 +38,7 @@ public class TextFormatPrometheusMetricsWriter implements PrometheusMetricsWrite
this.filteringDisabled = StringUtils.isAllBlank(sampleName, sampleLabelValue);
}
- @Override
- public void write(final Collection<CollectorRegistry> registries, final OutputStream outputStream) throws IOException {
- try (final Writer writer = new BufferedWriter(new OutputStreamWriter(outputStream))) {
- for (final CollectorRegistry collectorRegistry : registries) {
- final Enumeration<Collector.MetricFamilySamples> samples = getSamples(collectorRegistry);
- TextFormat.write004(writer, samples);
- writer.flush();
- }
- }
- }
-
- private Enumeration<Collector.MetricFamilySamples> getSamples(final CollectorRegistry registry) {
+ Enumeration<Collector.MetricFamilySamples> getSamples(final CollectorRegistry registry) {
final Enumeration<Collector.MetricFamilySamples> samples = registry.metricFamilySamples();
return filteringDisabled ? samples : new FilteringMetricFamilySamplesEnumeration(
samples,
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/TextFormatPrometheusMetricsWriter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/JsonFormatPrometheusMetricsWriter.java
similarity index 51%
copy from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/TextFormatPrometheusMetricsWriter.java
copy to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/JsonFormatPrometheusMetricsWriter.java
index 9631fd4..131b9fd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/TextFormatPrometheusMetricsWriter.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/JsonFormatPrometheusMetricsWriter.java
@@ -16,10 +16,11 @@
*/
package org.apache.nifi.web.api.metrics;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
import io.prometheus.client.Collector;
import io.prometheus.client.CollectorRegistry;
-import io.prometheus.client.exporter.common.TextFormat;
-import org.apache.commons.lang3.StringUtils;
import java.io.BufferedWriter;
import java.io.IOException;
@@ -28,44 +29,41 @@ import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.Collection;
import java.util.Enumeration;
-import java.util.regex.Pattern;
/**
- * Prometheus Metrics Writer supporting Prometheus Text Version 0.0.4 with optional filtering
+ * Prometheus Metrics Writer with Json output format and optional filtering
*/
-public class TextFormatPrometheusMetricsWriter implements PrometheusMetricsWriter {
- private final Pattern sampleNamePattern;
+public class JsonFormatPrometheusMetricsWriter extends AbstractPrometheusMetricsWriter {
+ private final String rootFieldName;
- private final Pattern sampleLabelValuePattern;
-
- private final boolean filteringDisabled;
-
- public TextFormatPrometheusMetricsWriter(
- final String sampleName,
- final String sampleLabelValue
- ) {
- this.sampleNamePattern = StringUtils.isBlank(sampleName) ? null : Pattern.compile(sampleName);
- this.sampleLabelValuePattern = StringUtils.isBlank(sampleLabelValue) ? null : Pattern.compile(sampleLabelValue);
- this.filteringDisabled = StringUtils.isAllBlank(sampleName, sampleLabelValue);
+ public JsonFormatPrometheusMetricsWriter(final String sampleName, final String sampleLabelValue, final String rootFieldName) {
+ super(sampleName, sampleLabelValue);
+ this.rootFieldName = rootFieldName == null ? "samples" : rootFieldName;
}
@Override
public void write(final Collection<CollectorRegistry> registries, final OutputStream outputStream) throws IOException {
- try (final Writer writer = new BufferedWriter(new OutputStreamWriter(outputStream))) {
+ JsonFactory factory = new JsonFactory();
+ try (final Writer writer = new BufferedWriter(new OutputStreamWriter(outputStream));
+ final JsonGenerator generator = factory.createGenerator(writer)) {
+ generator.setCodec(new ObjectMapper());
+ generator.writeStartObject();
+ generator.writeFieldName(rootFieldName);
+ generator.writeStartArray();
for (final CollectorRegistry collectorRegistry : registries) {
final Enumeration<Collector.MetricFamilySamples> samples = getSamples(collectorRegistry);
- TextFormat.write004(writer, samples);
- writer.flush();
+ while (samples.hasMoreElements()) {
+ final Collector.MetricFamilySamples samples2 = samples.nextElement();
+ for (Collector.MetricFamilySamples.Sample sample : samples2.samples) {
+ generator.writeObject(sample);
+ generator.flush();
+ }
+ generator.flush();
+ }
}
+ generator.writeEndArray();
+ generator.writeEndObject();
+ generator.flush();
}
}
-
- private Enumeration<Collector.MetricFamilySamples> getSamples(final CollectorRegistry registry) {
- final Enumeration<Collector.MetricFamilySamples> samples = registry.metricFamilySamples();
- return filteringDisabled ? samples : new FilteringMetricFamilySamplesEnumeration(
- samples,
- sampleNamePattern,
- sampleLabelValuePattern
- );
- }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/TextFormatPrometheusMetricsWriter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/TextFormatPrometheusMetricsWriter.java
index 9631fd4..621c5ee 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/TextFormatPrometheusMetricsWriter.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/TextFormatPrometheusMetricsWriter.java
@@ -19,7 +19,6 @@ package org.apache.nifi.web.api.metrics;
import io.prometheus.client.Collector;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.common.TextFormat;
-import org.apache.commons.lang3.StringUtils;
import java.io.BufferedWriter;
import java.io.IOException;
@@ -28,25 +27,14 @@ import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.Collection;
import java.util.Enumeration;
-import java.util.regex.Pattern;
/**
* Prometheus Metrics Writer supporting Prometheus Text Version 0.0.4 with optional filtering
*/
-public class TextFormatPrometheusMetricsWriter implements PrometheusMetricsWriter {
- private final Pattern sampleNamePattern;
+public class TextFormatPrometheusMetricsWriter extends AbstractPrometheusMetricsWriter {
- private final Pattern sampleLabelValuePattern;
-
- private final boolean filteringDisabled;
-
- public TextFormatPrometheusMetricsWriter(
- final String sampleName,
- final String sampleLabelValue
- ) {
- this.sampleNamePattern = StringUtils.isBlank(sampleName) ? null : Pattern.compile(sampleName);
- this.sampleLabelValuePattern = StringUtils.isBlank(sampleLabelValue) ? null : Pattern.compile(sampleLabelValue);
- this.filteringDisabled = StringUtils.isAllBlank(sampleName, sampleLabelValue);
+ public TextFormatPrometheusMetricsWriter(final String sampleName, final String sampleLabelValue) {
+ super(sampleName, sampleLabelValue);
}
@Override
@@ -59,13 +47,4 @@ public class TextFormatPrometheusMetricsWriter implements PrometheusMetricsWrite
}
}
}
-
- private Enumeration<Collector.MetricFamilySamples> getSamples(final CollectorRegistry registry) {
- final Enumeration<Collector.MetricFamilySamples> samples = registry.metricFamilySamples();
- return filteringDisabled ? samples : new FilteringMetricFamilySamplesEnumeration(
- samples,
- sampleNamePattern,
- sampleLabelValuePattern
- );
- }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/request/FlowMetricsProducer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/request/FlowMetricsProducer.java
index e34c2cd..96eec2f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/request/FlowMetricsProducer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/request/FlowMetricsProducer.java
@@ -20,7 +20,8 @@ package org.apache.nifi.web.api.request;
* Flow Metrics Producers supported
*/
public enum FlowMetricsProducer {
- PROMETHEUS("prometheus");
+ PROMETHEUS("prometheus"),
+ JSON("json");
private final String producer;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestFlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestFlowResource.java
index bc7fdf1..6689a99 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestFlowResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestFlowResource.java
@@ -16,10 +16,21 @@
*/
package org.apache.nifi.web.api;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import io.prometheus.client.Collector.MetricFamilySamples.Sample;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.common.TextFormat;
import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
+import org.apache.nifi.prometheus.util.BulletinMetricsRegistry;
+import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
+import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.ResourceNotFoundException;
@@ -36,10 +47,18 @@ import javax.ws.rs.core.StreamingOutput;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -51,20 +70,19 @@ import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class TestFlowResource {
private static final String LABEL_VALUE = TestFlowResource.class.getSimpleName();
-
private static final String OTHER_LABEL_VALUE = JmxJvmMetrics.class.getSimpleName();
-
private static final String THREAD_COUNT_NAME = "nifi_jvm_thread_count";
-
private static final String HEAP_USAGE_NAME = "nifi_jvm_heap_usage";
-
private static final String HEAP_USED_NAME = "nifi_jvm_heap_used";
-
private static final String HEAP_STARTS_WITH_PATTERN = "nifi_jvm_heap.*";
-
private static final String THREAD_COUNT_LABEL = String.format("nifi_jvm_thread_count{instance=\"%s\"", LABEL_VALUE);
-
private static final String THREAD_COUNT_OTHER_LABEL = String.format("nifi_jvm_thread_count{instance=\"%s\"", OTHER_LABEL_VALUE);
+ private static final String ROOT_FIELD_NAME = "beans";
+ private static final String SAMPLE_NAME_JVM = ".*jvm.*";
+ private static final String SAMPLE_LABEL_VALUES_ROOT_PROCESS_GROUP = "RootProcessGroup";
+ private static final String SAMPLE_LABEL_VALUES_PROCESS_GROUP = "ProcessGroup";
+ private static final String COMPONENT_TYPE_LABEL = "component_type";
+ private static final int COMPONENT_TYPE_VALUE_INDEX = 1;
@InjectMocks
private FlowResource resource = new FlowResource();
@@ -74,7 +92,7 @@ public class TestFlowResource {
@Test
public void testGetFlowMetricsProducerInvalid() {
- assertThrows(ResourceNotFoundException.class, () -> resource.getFlowMetrics(String.class.toString(), Collections.emptySet(), null, null));
+ assertThrows(ResourceNotFoundException.class, () -> resource.getFlowMetrics(String.class.toString(), Collections.emptySet(), null, null, null));
}
@Test
@@ -82,7 +100,7 @@ public class TestFlowResource {
final List<CollectorRegistry> registries = getCollectorRegistries();
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
- final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), null, null);
+ final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), null, null, null);
assertNotNull(response);
assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004), response.getMediaType());
@@ -98,7 +116,7 @@ public class TestFlowResource {
final List<CollectorRegistry> registries = getCollectorRegistries();
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
- final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), THREAD_COUNT_NAME, null);
+ final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), THREAD_COUNT_NAME, null, null);
assertNotNull(response);
assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004), response.getMediaType());
@@ -114,7 +132,7 @@ public class TestFlowResource {
final List<CollectorRegistry> registries = getCollectorRegistries();
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
- final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), HEAP_STARTS_WITH_PATTERN, null);
+ final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), HEAP_STARTS_WITH_PATTERN, null, null);
assertNotNull(response);
assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004), response.getMediaType());
@@ -131,7 +149,7 @@ public class TestFlowResource {
final List<CollectorRegistry> registries = getCollectorRegistries();
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
- final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), null, LABEL_VALUE);
+ final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), null, LABEL_VALUE, null);
assertNotNull(response);
assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004), response.getMediaType());
@@ -147,7 +165,7 @@ public class TestFlowResource {
final List<CollectorRegistry> registries = getCollectorRegistries();
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
- final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), THREAD_COUNT_NAME, LABEL_VALUE);
+ final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), THREAD_COUNT_NAME, LABEL_VALUE, null);
assertNotNull(response);
assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004), response.getMediaType());
@@ -160,6 +178,110 @@ public class TestFlowResource {
assertTrue(output.contains(HEAP_USAGE_NAME), "Heap Usage name not found");
}
+ @Test
+ public void testGetFlowMetricsPrometheusAsJson() throws IOException {
+ final List<CollectorRegistry> registries = getCollectorRegistriesForJson();
+ when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
+
+ final Response response = resource.getFlowMetrics(FlowMetricsProducer.JSON.getProducer(), Collections.emptySet(), null, null, ROOT_FIELD_NAME);
+
+ assertNotNull(response);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getMediaType());
+
+ final Map<String, List<Sample>> metrics = convertJsonResponseToMap(response);
+ assertThat(metrics.keySet(), hasSize(1));
+ assertThat(metrics, hasKey(ROOT_FIELD_NAME));
+
+ final List<Sample> registryList = metrics.get(ROOT_FIELD_NAME);
+ assertThat(registryList, hasSize(9));
+
+ final Map<String, Long> result = getResult(registryList);
+ assertThat(3L, equalTo(result.get(SAMPLE_NAME_JVM)));
+ assertThat(4L, equalTo(result.get(SAMPLE_LABEL_VALUES_PROCESS_GROUP)));
+ assertThat(2L, equalTo(result.get(SAMPLE_LABEL_VALUES_ROOT_PROCESS_GROUP)));
+ }
+
+ @Test
+ public void testGetFlowMetricsPrometheusAsJsonSampleName() throws IOException {
+ final List<CollectorRegistry> registries = getCollectorRegistriesForJson();
+ when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
+
+ final Response response = resource.getFlowMetrics(FlowMetricsProducer.JSON.getProducer(), Collections.emptySet(), SAMPLE_NAME_JVM, null, ROOT_FIELD_NAME);
+ assertNotNull(response);
+ assertEquals(MediaType.valueOf(MediaType.APPLICATION_JSON), response.getMediaType());
+
+ final Map<String, List<Sample>> metrics = convertJsonResponseToMap(response);
+ assertThat(metrics.keySet(), hasSize(1));
+ assertThat(metrics, hasKey(ROOT_FIELD_NAME));
+
+ final List<Sample> registryList = metrics.get(ROOT_FIELD_NAME);
+ assertThat(registryList, hasSize(3));
+
+ final Map<String, Long> result = getResult(registryList);
+ assertThat(3L, equalTo(result.get(SAMPLE_NAME_JVM)));
+ }
+
+ @Test
+ public void testGetFlowMetricsPrometheusAsJsonSampleNameStartsWithPattern() throws IOException {
+ final List<CollectorRegistry> registries = getCollectorRegistriesForJson();
+ when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
+
+ final Response response = resource.getFlowMetrics(FlowMetricsProducer.JSON.getProducer(), Collections.emptySet(), HEAP_STARTS_WITH_PATTERN, null, ROOT_FIELD_NAME);
+ assertNotNull(response);
+ assertEquals(MediaType.valueOf(MediaType.APPLICATION_JSON), response.getMediaType());
+
+ final Map<String, List<Sample>> metrics = convertJsonResponseToMap(response);
+ assertThat(metrics.keySet(), hasSize(1));
+ assertThat(metrics, hasKey(ROOT_FIELD_NAME));
+
+ final List<Sample> registryList = metrics.get(ROOT_FIELD_NAME);
+ assertThat(registryList, hasSize(2));
+
+ final Map<String, Long> result = getResult(registryList);
+ assertThat(2L, equalTo(result.get(SAMPLE_NAME_JVM)));
+ }
+
+ @Test
+ public void testGetFlowMetricsPrometheusAsJsonSampleLabelValue() throws IOException {
+ final List<CollectorRegistry> registries = getCollectorRegistriesForJson();
+ when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
+
+ final Response response = resource.getFlowMetrics(FlowMetricsProducer.JSON.getProducer(), Collections.emptySet(), null, SAMPLE_LABEL_VALUES_ROOT_PROCESS_GROUP, ROOT_FIELD_NAME);
+ assertNotNull(response);
+ assertEquals(MediaType.valueOf(MediaType.APPLICATION_JSON), response.getMediaType());
+
+ final Map<String, List<Sample>> metrics = convertJsonResponseToMap(response);
+ assertThat(metrics.keySet(), hasSize(1));
+ assertThat(metrics, hasKey(ROOT_FIELD_NAME));
+
+ final List<Sample> registryList = metrics.get(ROOT_FIELD_NAME);
+ assertThat(registryList, hasSize(2));
+
+ final Map<String, Long> result = getResult(registryList);
+ assertThat(2L, equalTo(result.get(SAMPLE_LABEL_VALUES_ROOT_PROCESS_GROUP)));
+ }
+
+ @Test
+ public void testGetFlowMetricsPrometheusAsJsonSampleNameAndSampleLabelValue() throws IOException {
+ final List<CollectorRegistry> registries = getCollectorRegistriesForJson();
+ when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
+
+ final Response response = resource.getFlowMetrics(FlowMetricsProducer.JSON.getProducer(), Collections.emptySet(), SAMPLE_NAME_JVM, SAMPLE_LABEL_VALUES_ROOT_PROCESS_GROUP, ROOT_FIELD_NAME);
+ assertNotNull(response);
+ assertEquals(MediaType.valueOf(MediaType.APPLICATION_JSON), response.getMediaType());
+
+ final Map<String, List<Sample>> metrics = convertJsonResponseToMap(response);
+ assertThat(metrics.keySet(), hasSize(1));
+ assertThat(metrics, hasKey(ROOT_FIELD_NAME));
+
+ final List<Sample> registryList = metrics.get(ROOT_FIELD_NAME);
+ assertThat(registryList, hasSize(5));
+
+ final Map<String, Long> result = getResult(registryList);
+ assertThat(3L, equalTo(result.get(SAMPLE_NAME_JVM)));
+ assertThat(2L, equalTo(result.get(SAMPLE_LABEL_VALUES_ROOT_PROCESS_GROUP)));
+ }
+
private String getResponseOutput(final Response response) throws IOException {
final StreamingOutput streamingOutput = (StreamingOutput) response.getEntity();
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
@@ -174,4 +296,105 @@ public class TestFlowResource {
final CollectorRegistry otherJvmCollectorRegistry = PrometheusMetricsUtil.createJvmMetrics(jvmMetricsRegistry, JmxJvmMetrics.getInstance(), OTHER_LABEL_VALUE);
return Arrays.asList(jvmCollectorRegistry, otherJvmCollectorRegistry);
}
+
+ private Map<String, List<Sample>> convertJsonResponseToMap(final Response response) throws IOException {
+ final TypeReference<HashMap<String, List<Sample>>> typeReference = new TypeReference<HashMap<String, List<Sample>>>() {};
+ final ObjectMapper mapper = new ObjectMapper();
+ final SimpleModule module = new SimpleModule();
+
+ module.addDeserializer(Sample.class, new SampleDeserializer());
+ mapper.registerModule(module);
+
+ final String json = getResponseOutput(response);
+ return mapper.readValue(json, typeReference);
+ }
+
+ private Map<String, Long> getResult(final List<Sample> registries) {
+ return registries.stream()
+ .collect(Collectors.groupingBy(
+ sample -> getResultKey(sample),
+ Collectors.counting()));
+ }
+
+ private String getResultKey(final Sample sample) {
+ return sample.labelNames.contains(COMPONENT_TYPE_LABEL) ? sample.labelValues.get(COMPONENT_TYPE_VALUE_INDEX) : SAMPLE_NAME_JVM;
+ }
+
+ private static List<CollectorRegistry> getCollectorRegistriesForJson() {
+ final List<CollectorRegistry> registryList = new ArrayList<>();
+
+ registryList.add(getNifiMetricsRegistry());
+ registryList.add(getConnectionMetricsRegistry());
+ registryList.add(getJvmMetricsRegistry());
+ registryList.add(getBulletinMetricsRegistry());
+
+ return registryList;
+
+ }
+
+ private static CollectorRegistry getNifiMetricsRegistry() {
+ final NiFiMetricsRegistry registry = new NiFiMetricsRegistry();
+
+ registry.setDataPoint(136, "TOTAL_BYTES_READ",
+ "RootPGId", SAMPLE_LABEL_VALUES_ROOT_PROCESS_GROUP, "rootPGName", "", "");
+ registry.setDataPoint(136, "TOTAL_BYTES_READ",
+ "PGId", SAMPLE_LABEL_VALUES_PROCESS_GROUP, "PGName", "RootPGId", "");
+
+ return registry.getRegistry();
+ }
+
+ private static CollectorRegistry getConnectionMetricsRegistry() {
+ final ConnectionAnalyticsMetricsRegistry connectionMetricsRegistry = new ConnectionAnalyticsMetricsRegistry();
+
+ connectionMetricsRegistry.setDataPoint(1.0,
+ "TIME_TO_BYTES_BACKPRESSURE_PREDICTION",
+ "PGId", SAMPLE_LABEL_VALUES_PROCESS_GROUP, "success", "connComponentId",
+ "RootPGId", "sourceId", "sourceName", "destinationId", "destinationName");
+ connectionMetricsRegistry.setDataPoint(1.0,
+ "TIME_TO_BYTES_BACKPRESSURE_PREDICTION",
+ "RootPGId", SAMPLE_LABEL_VALUES_ROOT_PROCESS_GROUP, "rootPGName", "", "", "", "", "", "");
+
+ return connectionMetricsRegistry.getRegistry();
+ }
+
+ private static CollectorRegistry getJvmMetricsRegistry() {
+ final JvmMetricsRegistry jvmMetricsRegistry = new JvmMetricsRegistry();
+
+ jvmMetricsRegistry.setDataPoint(4.0, "JVM_HEAP_USED", "instanceId");
+ jvmMetricsRegistry.setDataPoint(6.0, "JVM_HEAP_USAGE", "instanceId");
+ jvmMetricsRegistry.setDataPoint(10.0, "JVM_THREAD_COUNT", "instanceId");
+
+ return jvmMetricsRegistry.getRegistry();
+ }
+
+ private static CollectorRegistry getBulletinMetricsRegistry() {
+ final BulletinMetricsRegistry bulletinMetricsRegistry = new BulletinMetricsRegistry();
+
+ bulletinMetricsRegistry.setDataPoint(1, "BULLETIN", "B1Id", SAMPLE_LABEL_VALUES_PROCESS_GROUP, "PGId", "RootPGId",
+ "nodeAddress", "category", "sourceName", "sourceId", "level");
+ bulletinMetricsRegistry.setDataPoint(1, "BULLETIN", "B2Id", SAMPLE_LABEL_VALUES_PROCESS_GROUP, "PGId", "RootPGId",
+ "nodeAddress", "category", "sourceName", "sourceId", "level");
+
+ return bulletinMetricsRegistry.getRegistry();
+ }
+
+ private static class SampleDeserializer extends StdDeserializer<Sample> {
+ protected SampleDeserializer() {
+ super(Sample.class);
+ }
+
+ @Override
+ public Sample deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
+ final JsonNode node = jsonParser.getCodec().readTree(jsonParser);
+
+ final String name = node.get("name").asText();
+ final List<String> labelNames = new ArrayList<>();
+ node.get("labelNames").elements().forEachRemaining(e -> labelNames.add(e.asText()));
+ final List<String> labelValues = new ArrayList<>();
+ node.get("labelValues").elements().forEachRemaining(e -> labelValues.add(e.asText()));
+ final double value = node.get("value").asDouble();
+
+ return new Sample(name, labelNames, labelValues, value);
+ }
+ }
}
\ No newline at end of file