You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/01/20 17:15:26 UTC

[nifi] branch main updated: NIFI-9587 Added JSON format for Prometheus Flow Metrics

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new ccd47de  NIFI-9587 Added JSON format for Prometheus Flow Metrics
ccd47de is described below

commit ccd47de6dca4f9be90bfdbeb193c81e85b953170
Author: Timea Barna <ti...@gmail.com>
AuthorDate: Wed Jan 19 08:35:09 2022 +0100

    NIFI-9587 Added JSON format for Prometheus Flow Metrics
    
    This closes #5673
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../java/org/apache/nifi/web/api/FlowResource.java |  17 +-
 ...r.java => AbstractPrometheusMetricsWriter.java} |  28 +--
 ...java => JsonFormatPrometheusMetricsWriter.java} |  56 +++--
 .../metrics/TextFormatPrometheusMetricsWriter.java |  27 +--
 .../nifi/web/api/request/FlowMetricsProducer.java  |   3 +-
 .../org/apache/nifi/web/api/TestFlowResource.java  | 249 +++++++++++++++++++--
 6 files changed, 287 insertions(+), 93 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
index 79a31d0..0431ebc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
@@ -111,6 +111,7 @@ import org.apache.nifi.web.api.entity.VersionedFlowEntity;
 import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
 import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataSetEntity;
 import org.apache.nifi.web.api.entity.VersionedFlowsEntity;
+import org.apache.nifi.web.api.metrics.JsonFormatPrometheusMetricsWriter;
 import org.apache.nifi.web.api.metrics.TextFormatPrometheusMetricsWriter;
 import org.apache.nifi.web.api.metrics.PrometheusMetricsWriter;
 import org.apache.nifi.web.api.request.BulletinBoardPatternParameter;
@@ -428,7 +429,11 @@ public class FlowResource extends ApplicationResource {
             @ApiParam(
                     value = "Regular Expression Pattern to be applied against the sample label value field"
             )
-            @QueryParam("sampleLabelValue") final String sampleLabelValue
+            @QueryParam("sampleLabelValue") final String sampleLabelValue,
+            @ApiParam(
+                    value = "Name of the first field of JSON object. Applicable for JSON producer only."
+            )
+            @QueryParam("rootFieldName") final String rootFieldName
     ) {
 
         authorizeFlow();
@@ -442,6 +447,16 @@ public class FlowResource extends ApplicationResource {
                 prometheusMetricsWriter.write(registries, outputStream);
             });
             return generateOkResponse(response).type(TextFormat.CONTENT_TYPE_004).build();
+
+        } else if (FlowMetricsProducer.JSON.getProducer().equals(producer)) {
+            final StreamingOutput output = outputStream -> {
+                final JsonFormatPrometheusMetricsWriter jsonPrometheusMetricsWriter = new JsonFormatPrometheusMetricsWriter(sampleName, sampleLabelValue, rootFieldName);
+                jsonPrometheusMetricsWriter.write(registries, outputStream);
+            };
+            return generateOkResponse(output)
+                    .type(MediaType.APPLICATION_JSON_TYPE)
+                    .build();
+
         } else {
             throw new ResourceNotFoundException("The specified producer is missing or invalid.");
         }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/TextFormatPrometheusMetricsWriter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/AbstractPrometheusMetricsWriter.java
similarity index 63%
copy from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/TextFormatPrometheusMetricsWriter.java
copy to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/AbstractPrometheusMetricsWriter.java
index 9631fd4..aca66fd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/TextFormatPrometheusMetricsWriter.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/AbstractPrometheusMetricsWriter.java
@@ -18,29 +18,18 @@ package org.apache.nifi.web.api.metrics;
 
 import io.prometheus.client.Collector;
 import io.prometheus.client.CollectorRegistry;
-import io.prometheus.client.exporter.common.TextFormat;
 import org.apache.commons.lang3.StringUtils;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.util.Collection;
 import java.util.Enumeration;
 import java.util.regex.Pattern;
 
-/**
- * Prometheus Metrics Writer supporting Prometheus Text Version 0.0.4 with optional filtering
- */
-public class TextFormatPrometheusMetricsWriter implements PrometheusMetricsWriter {
+public abstract class AbstractPrometheusMetricsWriter implements PrometheusMetricsWriter {
     private final Pattern sampleNamePattern;
 
     private final Pattern sampleLabelValuePattern;
 
     private final boolean filteringDisabled;
 
-    public TextFormatPrometheusMetricsWriter(
+    public AbstractPrometheusMetricsWriter(
             final String sampleName,
             final String sampleLabelValue
     ) {
@@ -49,18 +38,7 @@ public class TextFormatPrometheusMetricsWriter implements PrometheusMetricsWrite
         this.filteringDisabled = StringUtils.isAllBlank(sampleName, sampleLabelValue);
     }
 
-    @Override
-    public void write(final Collection<CollectorRegistry> registries, final OutputStream outputStream) throws IOException {
-        try (final Writer writer = new BufferedWriter(new OutputStreamWriter(outputStream))) {
-            for (final CollectorRegistry collectorRegistry : registries) {
-                final Enumeration<Collector.MetricFamilySamples> samples = getSamples(collectorRegistry);
-                TextFormat.write004(writer, samples);
-                writer.flush();
-            }
-        }
-    }
-
-    private Enumeration<Collector.MetricFamilySamples> getSamples(final CollectorRegistry registry) {
+    Enumeration<Collector.MetricFamilySamples> getSamples(final CollectorRegistry registry) {
         final Enumeration<Collector.MetricFamilySamples> samples = registry.metricFamilySamples();
         return filteringDisabled ? samples : new FilteringMetricFamilySamplesEnumeration(
                 samples,
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/TextFormatPrometheusMetricsWriter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/JsonFormatPrometheusMetricsWriter.java
similarity index 51%
copy from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/TextFormatPrometheusMetricsWriter.java
copy to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/JsonFormatPrometheusMetricsWriter.java
index 9631fd4..131b9fd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/TextFormatPrometheusMetricsWriter.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/JsonFormatPrometheusMetricsWriter.java
@@ -16,10 +16,11 @@
  */
 package org.apache.nifi.web.api.metrics;
 
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import io.prometheus.client.Collector;
 import io.prometheus.client.CollectorRegistry;
-import io.prometheus.client.exporter.common.TextFormat;
-import org.apache.commons.lang3.StringUtils;
 
 import java.io.BufferedWriter;
 import java.io.IOException;
@@ -28,44 +29,41 @@ import java.io.OutputStreamWriter;
 import java.io.Writer;
 import java.util.Collection;
 import java.util.Enumeration;
-import java.util.regex.Pattern;
 
 /**
- * Prometheus Metrics Writer supporting Prometheus Text Version 0.0.4 with optional filtering
+ * Prometheus Metrics Writer with Json output format and optional filtering
  */
-public class TextFormatPrometheusMetricsWriter implements PrometheusMetricsWriter {
-    private final Pattern sampleNamePattern;
+public class JsonFormatPrometheusMetricsWriter extends AbstractPrometheusMetricsWriter {
+    private final String rootFieldName;
 
-    private final Pattern sampleLabelValuePattern;
-
-    private final boolean filteringDisabled;
-
-    public TextFormatPrometheusMetricsWriter(
-            final String sampleName,
-            final String sampleLabelValue
-    ) {
-        this.sampleNamePattern = StringUtils.isBlank(sampleName) ? null : Pattern.compile(sampleName);
-        this.sampleLabelValuePattern = StringUtils.isBlank(sampleLabelValue) ? null : Pattern.compile(sampleLabelValue);
-        this.filteringDisabled = StringUtils.isAllBlank(sampleName, sampleLabelValue);
+    public JsonFormatPrometheusMetricsWriter(final String sampleName, final String sampleLabelValue, final String rootFieldName) {
+        super(sampleName, sampleLabelValue);
+        this.rootFieldName = rootFieldName == null ? "samples" : rootFieldName;
     }
 
     @Override
     public void write(final Collection<CollectorRegistry> registries, final OutputStream outputStream) throws IOException {
-        try (final Writer writer = new BufferedWriter(new OutputStreamWriter(outputStream))) {
+        JsonFactory factory = new JsonFactory();
+        try (final Writer writer = new BufferedWriter(new OutputStreamWriter(outputStream));
+        final JsonGenerator generator = factory.createGenerator(writer)) {
+            generator.setCodec(new ObjectMapper());
+            generator.writeStartObject();
+            generator.writeFieldName(rootFieldName);
+            generator.writeStartArray();
             for (final CollectorRegistry collectorRegistry : registries) {
                 final Enumeration<Collector.MetricFamilySamples> samples = getSamples(collectorRegistry);
-                TextFormat.write004(writer, samples);
-                writer.flush();
+                while (samples.hasMoreElements()) {
+                    final Collector.MetricFamilySamples samples2 = samples.nextElement();
+                    for (Collector.MetricFamilySamples.Sample sample : samples2.samples) {
+                        generator.writeObject(sample);
+                        generator.flush();
+                    }
+                    generator.flush();
+                }
             }
+            generator.writeEndArray();
+            generator.writeEndObject();
+            generator.flush();
         }
     }
-
-    private Enumeration<Collector.MetricFamilySamples> getSamples(final CollectorRegistry registry) {
-        final Enumeration<Collector.MetricFamilySamples> samples = registry.metricFamilySamples();
-        return filteringDisabled ? samples : new FilteringMetricFamilySamplesEnumeration(
-                samples,
-                sampleNamePattern,
-                sampleLabelValuePattern
-        );
-    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/TextFormatPrometheusMetricsWriter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/TextFormatPrometheusMetricsWriter.java
index 9631fd4..621c5ee 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/TextFormatPrometheusMetricsWriter.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/TextFormatPrometheusMetricsWriter.java
@@ -19,7 +19,6 @@ package org.apache.nifi.web.api.metrics;
 import io.prometheus.client.Collector;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.exporter.common.TextFormat;
-import org.apache.commons.lang3.StringUtils;
 
 import java.io.BufferedWriter;
 import java.io.IOException;
@@ -28,25 +27,14 @@ import java.io.OutputStreamWriter;
 import java.io.Writer;
 import java.util.Collection;
 import java.util.Enumeration;
-import java.util.regex.Pattern;
 
 /**
  * Prometheus Metrics Writer supporting Prometheus Text Version 0.0.4 with optional filtering
  */
-public class TextFormatPrometheusMetricsWriter implements PrometheusMetricsWriter {
-    private final Pattern sampleNamePattern;
+public class TextFormatPrometheusMetricsWriter extends AbstractPrometheusMetricsWriter {
 
-    private final Pattern sampleLabelValuePattern;
-
-    private final boolean filteringDisabled;
-
-    public TextFormatPrometheusMetricsWriter(
-            final String sampleName,
-            final String sampleLabelValue
-    ) {
-        this.sampleNamePattern = StringUtils.isBlank(sampleName) ? null : Pattern.compile(sampleName);
-        this.sampleLabelValuePattern = StringUtils.isBlank(sampleLabelValue) ? null : Pattern.compile(sampleLabelValue);
-        this.filteringDisabled = StringUtils.isAllBlank(sampleName, sampleLabelValue);
+    public TextFormatPrometheusMetricsWriter(final String sampleName, final String sampleLabelValue) {
+        super(sampleName, sampleLabelValue);
     }
 
     @Override
@@ -59,13 +47,4 @@ public class TextFormatPrometheusMetricsWriter implements PrometheusMetricsWrite
             }
         }
     }
-
-    private Enumeration<Collector.MetricFamilySamples> getSamples(final CollectorRegistry registry) {
-        final Enumeration<Collector.MetricFamilySamples> samples = registry.metricFamilySamples();
-        return filteringDisabled ? samples : new FilteringMetricFamilySamplesEnumeration(
-                samples,
-                sampleNamePattern,
-                sampleLabelValuePattern
-        );
-    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/request/FlowMetricsProducer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/request/FlowMetricsProducer.java
index e34c2cd..96eec2f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/request/FlowMetricsProducer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/request/FlowMetricsProducer.java
@@ -20,7 +20,8 @@ package org.apache.nifi.web.api.request;
  * Flow Metrics Producers supported
  */
 public enum FlowMetricsProducer {
-    PROMETHEUS("prometheus");
+    PROMETHEUS("prometheus"),
+    JSON("json");
 
     private final String producer;
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestFlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestFlowResource.java
index bc7fdf1..6689a99 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestFlowResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestFlowResource.java
@@ -16,10 +16,21 @@
  */
 package org.apache.nifi.web.api;
 
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import io.prometheus.client.Collector.MetricFamilySamples.Sample;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.exporter.common.TextFormat;
 import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
+import org.apache.nifi.prometheus.util.BulletinMetricsRegistry;
+import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
 import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
+import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
 import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
 import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.ResourceNotFoundException;
@@ -36,10 +47,18 @@ import javax.ws.rs.core.StreamingOutput;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.hasSize;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -51,20 +70,19 @@ import static org.mockito.Mockito.when;
 @ExtendWith(MockitoExtension.class)
 public class TestFlowResource {
     private static final String LABEL_VALUE = TestFlowResource.class.getSimpleName();
-
     private static final String OTHER_LABEL_VALUE = JmxJvmMetrics.class.getSimpleName();
-
     private static final String THREAD_COUNT_NAME = "nifi_jvm_thread_count";
-
     private static final String HEAP_USAGE_NAME = "nifi_jvm_heap_usage";
-
     private static final String HEAP_USED_NAME = "nifi_jvm_heap_used";
-
     private static final String HEAP_STARTS_WITH_PATTERN = "nifi_jvm_heap.*";
-
     private static final String THREAD_COUNT_LABEL = String.format("nifi_jvm_thread_count{instance=\"%s\"", LABEL_VALUE);
-
     private static final String THREAD_COUNT_OTHER_LABEL = String.format("nifi_jvm_thread_count{instance=\"%s\"", OTHER_LABEL_VALUE);
+    private static final String ROOT_FIELD_NAME = "beans";
+    private static final String SAMPLE_NAME_JVM = ".*jvm.*";
+    private static final String SAMPLE_LABEL_VALUES_ROOT_PROCESS_GROUP = "RootProcessGroup";
+    private static final String SAMPLE_LABEL_VALUES_PROCESS_GROUP = "ProcessGroup";
+    private static final String COMPONENT_TYPE_LABEL = "component_type";
+    private static final int COMPONENT_TYPE_VALUE_INDEX = 1;
 
     @InjectMocks
     private FlowResource resource = new FlowResource();
@@ -74,7 +92,7 @@ public class TestFlowResource {
 
     @Test
     public void testGetFlowMetricsProducerInvalid() {
-        assertThrows(ResourceNotFoundException.class, () -> resource.getFlowMetrics(String.class.toString(), Collections.emptySet(), null, null));
+        assertThrows(ResourceNotFoundException.class, () -> resource.getFlowMetrics(String.class.toString(), Collections.emptySet(), null, null, null));
     }
 
     @Test
@@ -82,7 +100,7 @@ public class TestFlowResource {
         final List<CollectorRegistry> registries = getCollectorRegistries();
         when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
 
-        final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), null, null);
+        final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), null, null, null);
 
         assertNotNull(response);
         assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004), response.getMediaType());
@@ -98,7 +116,7 @@ public class TestFlowResource {
         final List<CollectorRegistry> registries = getCollectorRegistries();
         when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
 
-        final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), THREAD_COUNT_NAME, null);
+        final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), THREAD_COUNT_NAME, null, null);
 
         assertNotNull(response);
         assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004), response.getMediaType());
@@ -114,7 +132,7 @@ public class TestFlowResource {
         final List<CollectorRegistry> registries = getCollectorRegistries();
         when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
 
-        final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), HEAP_STARTS_WITH_PATTERN, null);
+        final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), HEAP_STARTS_WITH_PATTERN, null, null);
 
         assertNotNull(response);
         assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004), response.getMediaType());
@@ -131,7 +149,7 @@ public class TestFlowResource {
         final List<CollectorRegistry> registries = getCollectorRegistries();
         when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
 
-        final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), null, LABEL_VALUE);
+        final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), null, LABEL_VALUE, null);
 
         assertNotNull(response);
         assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004), response.getMediaType());
@@ -147,7 +165,7 @@ public class TestFlowResource {
         final List<CollectorRegistry> registries = getCollectorRegistries();
         when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
 
-        final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), THREAD_COUNT_NAME, LABEL_VALUE);
+        final Response response = resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), Collections.emptySet(), THREAD_COUNT_NAME, LABEL_VALUE, null);
 
         assertNotNull(response);
         assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004), response.getMediaType());
@@ -160,6 +178,110 @@ public class TestFlowResource {
         assertTrue(output.contains(HEAP_USAGE_NAME), "Heap Usage name not found");
     }
 
+    @Test
+    public void testGetFlowMetricsPrometheusAsJson() throws IOException {
+        final List<CollectorRegistry> registries = getCollectorRegistriesForJson();
+        when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
+
+        final Response response = resource.getFlowMetrics(FlowMetricsProducer.JSON.getProducer(), Collections.emptySet(), null, null, ROOT_FIELD_NAME);
+
+        assertNotNull(response);
+        assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getMediaType());
+
+        final Map<String, List<Sample>> metrics = convertJsonResponseToMap(response);
+        assertThat(metrics.keySet(), hasSize(1));
+        assertThat(metrics, hasKey(ROOT_FIELD_NAME));
+
+        final List<Sample> registryList = metrics.get(ROOT_FIELD_NAME);
+        assertThat(registryList, hasSize(9));
+
+        final Map<String, Long> result = getResult(registryList);
+        assertThat(3L, equalTo(result.get(SAMPLE_NAME_JVM)));
+        assertThat(4L, equalTo(result.get(SAMPLE_LABEL_VALUES_PROCESS_GROUP)));
+        assertThat(2L, equalTo(result.get(SAMPLE_LABEL_VALUES_ROOT_PROCESS_GROUP)));
+    }
+
+    @Test
+    public void testGetFlowMetricsPrometheusAsJsonSampleName() throws IOException {
+        final List<CollectorRegistry> registries = getCollectorRegistriesForJson();
+        when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
+
+        final Response response = resource.getFlowMetrics(FlowMetricsProducer.JSON.getProducer(), Collections.emptySet(), SAMPLE_NAME_JVM, null, ROOT_FIELD_NAME);
+        assertNotNull(response);
+        assertEquals(MediaType.valueOf(MediaType.APPLICATION_JSON), response.getMediaType());
+
+        final Map<String, List<Sample>> metrics = convertJsonResponseToMap(response);
+        assertThat(metrics.keySet(), hasSize(1));
+        assertThat(metrics, hasKey(ROOT_FIELD_NAME));
+
+        final List<Sample> registryList = metrics.get(ROOT_FIELD_NAME);
+        assertThat(registryList, hasSize(3));
+
+        final Map<String, Long> result = getResult(registryList);
+        assertThat(3L, equalTo(result.get(SAMPLE_NAME_JVM)));
+    }
+
+    @Test
+    public void testGetFlowMetricsPrometheusAsJsonSampleNameStartsWithPattern() throws IOException {
+        final List<CollectorRegistry> registries = getCollectorRegistriesForJson();
+        when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
+
+        final Response response = resource.getFlowMetrics(FlowMetricsProducer.JSON.getProducer(), Collections.emptySet(), HEAP_STARTS_WITH_PATTERN, null, ROOT_FIELD_NAME);
+        assertNotNull(response);
+        assertEquals(MediaType.valueOf(MediaType.APPLICATION_JSON), response.getMediaType());
+
+        final Map<String, List<Sample>> metrics = convertJsonResponseToMap(response);
+        assertThat(metrics.keySet(), hasSize(1));
+        assertThat(metrics, hasKey(ROOT_FIELD_NAME));
+
+        final List<Sample> registryList = metrics.get(ROOT_FIELD_NAME);
+        assertThat(registryList, hasSize(2));
+
+        final Map<String, Long> result = getResult(registryList);
+        assertThat(2L, equalTo(result.get(SAMPLE_NAME_JVM)));
+    }
+
+    @Test
+    public void testGetFlowMetricsPrometheusAsJsonSampleLabelValue() throws IOException {
+        final List<CollectorRegistry> registries = getCollectorRegistriesForJson();
+        when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
+
+        final Response response = resource.getFlowMetrics(FlowMetricsProducer.JSON.getProducer(), Collections.emptySet(), null, SAMPLE_LABEL_VALUES_ROOT_PROCESS_GROUP, ROOT_FIELD_NAME);
+        assertNotNull(response);
+        assertEquals(MediaType.valueOf(MediaType.APPLICATION_JSON), response.getMediaType());
+
+        final Map<String, List<Sample>> metrics = convertJsonResponseToMap(response);
+        assertThat(metrics.keySet(), hasSize(1));
+        assertThat(metrics, hasKey(ROOT_FIELD_NAME));
+
+        final List<Sample> registryList = metrics.get(ROOT_FIELD_NAME);
+        assertThat(registryList, hasSize(2));
+
+        final Map<String, Long> result = getResult(registryList);
+        assertThat(2L, equalTo(result.get(SAMPLE_LABEL_VALUES_ROOT_PROCESS_GROUP)));
+    }
+
+    @Test
+    public void testGetFlowMetricsPrometheusAsJsonSampleNameAndSampleLabelValue() throws IOException {
+        final List<CollectorRegistry> registries = getCollectorRegistriesForJson();
+        when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
+
+        final Response response = resource.getFlowMetrics(FlowMetricsProducer.JSON.getProducer(), Collections.emptySet(), SAMPLE_NAME_JVM, SAMPLE_LABEL_VALUES_ROOT_PROCESS_GROUP, ROOT_FIELD_NAME);
+        assertNotNull(response);
+        assertEquals(MediaType.valueOf(MediaType.APPLICATION_JSON), response.getMediaType());
+
+        final Map<String, List<Sample>> metrics = convertJsonResponseToMap(response);
+        assertThat(metrics.keySet(), hasSize(1));
+        assertThat(metrics, hasKey(ROOT_FIELD_NAME));
+
+        final List<Sample> registryList = metrics.get(ROOT_FIELD_NAME);
+        assertThat(registryList, hasSize(5));
+
+        final Map<String, Long> result = getResult(registryList);
+        assertThat(3L, equalTo(result.get(SAMPLE_NAME_JVM)));
+        assertThat(2L, equalTo(result.get(SAMPLE_LABEL_VALUES_ROOT_PROCESS_GROUP)));
+    }
+
     private String getResponseOutput(final Response response) throws IOException {
         final StreamingOutput streamingOutput = (StreamingOutput) response.getEntity();
         final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
@@ -174,4 +296,105 @@ public class TestFlowResource {
         final CollectorRegistry otherJvmCollectorRegistry = PrometheusMetricsUtil.createJvmMetrics(jvmMetricsRegistry, JmxJvmMetrics.getInstance(), OTHER_LABEL_VALUE);
         return Arrays.asList(jvmCollectorRegistry, otherJvmCollectorRegistry);
     }
+
+    private Map<String, List<Sample>> convertJsonResponseToMap(final Response response) throws IOException {
+        final TypeReference<HashMap<String, List<Sample>>> typeReference = new TypeReference<HashMap<String, List<Sample>>>() {};
+        final ObjectMapper mapper = new ObjectMapper();
+        final SimpleModule module = new SimpleModule();
+
+        module.addDeserializer(Sample.class, new SampleDeserializer());
+        mapper.registerModule(module);
+
+        final String json = getResponseOutput(response);
+        return mapper.readValue(json, typeReference);
+    }
+
+    private Map<String, Long> getResult(final List<Sample> registries) {
+        return registries.stream()
+                .collect(Collectors.groupingBy(
+                        sample -> getResultKey(sample),
+                        Collectors.counting()));
+    }
+
+    private String getResultKey(final Sample sample) {
+        return sample.labelNames.contains(COMPONENT_TYPE_LABEL) ? sample.labelValues.get(COMPONENT_TYPE_VALUE_INDEX) : SAMPLE_NAME_JVM;
+    }
+
+    private static List<CollectorRegistry> getCollectorRegistriesForJson() {
+        final List<CollectorRegistry> registryList = new ArrayList<>();
+
+        registryList.add(getNifiMetricsRegistry());
+        registryList.add(getConnectionMetricsRegistry());
+        registryList.add(getJvmMetricsRegistry());
+        registryList.add(getBulletinMetricsRegistry());
+
+        return registryList;
+
+    }
+
+    private static CollectorRegistry getNifiMetricsRegistry() {
+        final NiFiMetricsRegistry registry = new NiFiMetricsRegistry();
+
+        registry.setDataPoint(136, "TOTAL_BYTES_READ",
+                "RootPGId", SAMPLE_LABEL_VALUES_ROOT_PROCESS_GROUP, "rootPGName", "", "");
+        registry.setDataPoint(136, "TOTAL_BYTES_READ",
+                "PGId", SAMPLE_LABEL_VALUES_PROCESS_GROUP, "PGName", "RootPGId", "");
+
+        return registry.getRegistry();
+    }
+
+    private static CollectorRegistry getConnectionMetricsRegistry() {
+        final ConnectionAnalyticsMetricsRegistry connectionMetricsRegistry = new ConnectionAnalyticsMetricsRegistry();
+
+        connectionMetricsRegistry.setDataPoint(1.0,
+                "TIME_TO_BYTES_BACKPRESSURE_PREDICTION",
+                "PGId", SAMPLE_LABEL_VALUES_PROCESS_GROUP, "success", "connComponentId",
+                "RootPGId", "sourceId", "sourceName", "destinationId", "destinationName");
+        connectionMetricsRegistry.setDataPoint(1.0,
+                "TIME_TO_BYTES_BACKPRESSURE_PREDICTION",
+                "RootPGId", SAMPLE_LABEL_VALUES_ROOT_PROCESS_GROUP, "rootPGName", "", "", "", "", "", "");
+
+        return connectionMetricsRegistry.getRegistry();
+    }
+
+    private static CollectorRegistry getJvmMetricsRegistry() {
+        final JvmMetricsRegistry jvmMetricsRegistry = new JvmMetricsRegistry();
+
+        jvmMetricsRegistry.setDataPoint(4.0, "JVM_HEAP_USED", "instanceId");
+        jvmMetricsRegistry.setDataPoint(6.0, "JVM_HEAP_USAGE", "instanceId");
+        jvmMetricsRegistry.setDataPoint(10.0, "JVM_THREAD_COUNT", "instanceId");
+
+        return jvmMetricsRegistry.getRegistry();
+    }
+
+    private static CollectorRegistry getBulletinMetricsRegistry() {
+        final BulletinMetricsRegistry bulletinMetricsRegistry = new BulletinMetricsRegistry();
+
+        bulletinMetricsRegistry.setDataPoint(1, "BULLETIN", "B1Id", SAMPLE_LABEL_VALUES_PROCESS_GROUP, "PGId", "RootPGId",
+                "nodeAddress", "category", "sourceName", "sourceId", "level");
+        bulletinMetricsRegistry.setDataPoint(1, "BULLETIN", "B2Id", SAMPLE_LABEL_VALUES_PROCESS_GROUP, "PGId", "RootPGId",
+                "nodeAddress", "category", "sourceName", "sourceId", "level");
+
+        return bulletinMetricsRegistry.getRegistry();
+    }
+
+    private static class SampleDeserializer extends StdDeserializer<Sample> {
+        protected SampleDeserializer() {
+            super(Sample.class);
+        }
+
+        @Override
+        public Sample deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
+            final JsonNode node = jsonParser.getCodec().readTree(jsonParser);
+
+            final String name = node.get("name").asText();
+            final List<String> labelNames = new ArrayList<>();
+            node.get("labelNames").elements().forEachRemaining(e -> labelNames.add(e.asText()));
+            final List<String> labelValues = new ArrayList<>();
+            node.get("labelValues").elements().forEachRemaining(e -> labelValues.add(e.asText()));
+            final double value = node.get("value").asDouble();
+
+            return new Sample(name, labelNames, labelValues, value);
+        }
+    }
 }
\ No newline at end of file