You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/11/08 21:03:23 UTC

[GitHub] [nifi] javiroman opened a new pull request #4651: NIFI-7988 Prometheus Remote Write Processor

javiroman opened a new pull request #4651:
URL: https://github.com/apache/nifi/pull/4651


   Thank you for submitting a contribution to Apache NiFi.
   
   Please provide a short description of the PR here:
   
   #### Description of PR
   
   _Enables X functionality; fixes bug NIFI-YYYY._
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [x] Is there a JIRA ticket associated with this PR? Is it referenced 
        in the commit message?
   
   - [ x] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically `main`)?
   
   - [ ] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._
   
   ### For code changes:
   - [x ] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder?
   - [x] Have you written or updated unit tests to verify your changes?
   - [x] Have you verified that the full build is successful on JDK 8?
   - [x] Have you verified that the full build is successful on JDK 11?
   - [x] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] javiroman closed pull request #4651: NIFI-7988 Prometheus Remote Write Processor

Posted by GitBox <gi...@apache.org>.
javiroman closed pull request #4651:
URL: https://github.com/apache/nifi/pull/4651


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] javiroman commented on pull request #4651: NIFI-7988 Prometheus Remote Write Processor

Posted by GitBox <gi...@apache.org>.
javiroman commented on pull request #4651:
URL: https://github.com/apache/nifi/pull/4651#issuecomment-782605394


   I'm going to close this PR in order to fix with a full rework and avoid the conflicts with the new main release.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] mattyb149 commented on a change in pull request #4651: NIFI-7988 Prometheus Remote Write Processor

Posted by GitBox <gi...@apache.org>.
mattyb149 commented on a change in pull request #4651:
URL: https://github.com/apache/nifi/pull/4651#discussion_r534304482



##########
File path: nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-processors/pom.xml
##########
@@ -0,0 +1,133 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-prometheus-bundle</artifactId>
+        <version>1.13.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-prometheus-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <properties>
+        <jetty.version>9.3.9.v20160517</jetty.version>
+        <protobuf.version>3.13.0</protobuf.version>
+        <snappy.version>1.1.7.1</snappy.version>
+        <maven.protobuf.version>0.6.1</maven.protobuf.version>
+        <maven.os.version>1.6.2</maven.os.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>${protobuf.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java-util</artifactId>
+            <version>${protobuf.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.xerial.snappy</groupId>
+            <artifactId>snappy-java</artifactId>

Review comment:
       This needs an entry in the NOTICE file for the NAR, see examples in other NARs (such as nifi-avro-nar)

##########
File path: nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-processors/pom.xml
##########
@@ -0,0 +1,133 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-prometheus-bundle</artifactId>
+        <version>1.13.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-prometheus-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <properties>
+        <jetty.version>9.3.9.v20160517</jetty.version>
+        <protobuf.version>3.13.0</protobuf.version>
+        <snappy.version>1.1.7.1</snappy.version>
+        <maven.protobuf.version>0.6.1</maven.protobuf.version>
+        <maven.os.version>1.6.2</maven.os.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>${protobuf.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java-util</artifactId>
+            <version>${protobuf.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.xerial.snappy</groupId>
+            <artifactId>snappy-java</artifactId>
+            <version>${snappy.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-server</artifactId>
+            <version>${jetty.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-servlet</artifactId>
+            <version>${jetty.version}</version>
+        </dependency>
+       <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-client</artifactId>
+            <version>${jetty.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <extensions>
+          <extension>
+            <groupId>kr.motd.maven</groupId>
+            <artifactId>os-maven-plugin</artifactId>
+            <version>${maven.os.version}</version>
+          </extension>
+        </extensions>
+        <plugins>
+            <plugin>
+                <groupId>org.xolstice.maven.plugins</groupId>
+                <artifactId>protobuf-maven-plugin</artifactId>
+                <version>${maven.protobuf.version}</version>
+                <configuration>
+                    <protocArtifact>com.google.protobuf:protoc:3.13.0:exe:${os.detected.classifier}</protocArtifact>
+                    <protoSourceRoot>${basedir}/src/main/resources/proto</protoSourceRoot>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>compile</goal>
+                            <goal>test-compile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes>
+                        <exclude>**/src/main/resources/proto/gogoproto/gogo.proto</exclude>

Review comment:
       gogo.proto has a license header, does it not pass the Rat check?

##########
File path: nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-processors/src/main/java/org/apache/nifi/processors/prometheus/PrometheusRemoteWrite.java
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.prometheus;
+
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.ContextHandler;
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import com.google.gson.Gson;
+import com.google.protobuf.util.JsonFormat;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.xerial.snappy.SnappyInputStream;
+import prometheus.Remote.WriteRequest;
+import prometheus.Types;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({ "prometheus", "metrics", "adapter", "remote write" })
+@CapabilityDescription("Listen for incoming samples from Prometheus." +
+        " Implements a remote endpoint adapter for writing Prometheus" +
+        " samples, primarily intended for metrics long term storage")
+@WritesAttributes({@WritesAttribute(attribute = "mime.type", description = "This is always application/json.")})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+public class PrometheusRemoteWrite extends AbstractProcessor {
+
+    public static final PropertyDescriptor REMOTE_WRITE_CONTEXT = new PropertyDescriptor
+            .Builder().name("Remote Write Context")
+            .displayName("Remote Write URL context")
+            .description("The context used in the remote_write url property in Prometheus")
+            .required(true)
+            .defaultValue("/write")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor REMOTE_WRITE_PORT = new PropertyDescriptor
+            .Builder().name("Remote Write Port")
+            .displayName("Remote Write URL port")
+            .description("The port used in the remote_write url property in Prometheus")
+            .required(true)
+            .addValidator(StandardValidators.INTEGER_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MAX_BATCH_METRICS = new PropertyDescriptor
+            .Builder().name("Max Batch Metrics")
+            .displayName("Max Batch Metrics")
+            .description("Number of max metrics in one FlowFile. When is not set, a FlowFile per metric is emitted")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("All content that is received is routed to the 'success' relationship")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("All content that is received is routed to the 'failure' relationship")
+            .build();
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static Server serverEndpoint;
+
+    // Maximum threads spawn, defaults to 200 max, min 8 threads
+    private static final int JETTY_MAX_THREADS = 500;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
+        descriptors.add(REMOTE_WRITE_CONTEXT);
+        descriptors.add(REMOTE_WRITE_PORT);
+        descriptors.add(MAX_BATCH_METRICS);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<Relationship>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() throws Exception {
+        getLogger().debug("onUnscheduled called");
+        if (serverEndpoint != null) {
+            serverEndpoint.stop();
+        }
+    }
+
+    @OnShutdown
+    public void onShutdown() throws Exception {
+        getLogger().debug("onShutdown called");
+        if (serverEndpoint != null) {
+            serverEndpoint.stop();
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final int port = context.getProperty(REMOTE_WRITE_PORT).asInteger();
+        final String contextPath = context.getProperty(REMOTE_WRITE_CONTEXT).getValue();
+        final int maxBatch;
+
+        if (context.getProperty(MAX_BATCH_METRICS).isSet()) {
+            maxBatch = context.getProperty(MAX_BATCH_METRICS).asInteger();
+        } else {
+           maxBatch = 0;
+        }
+
+        getLogger().debug("onTrigger called");
+
+        // Internal Jetty thread pool tuning
+        QueuedThreadPool threadPool = new QueuedThreadPool();
+        threadPool.setMaxThreads(JETTY_MAX_THREADS);
+        serverEndpoint = new Server(threadPool);
+
+        ServerConnector connector = new ServerConnector(serverEndpoint);
+        connector.setPort(port);
+        serverEndpoint.addConnector(connector);
+
+        // Setup only one handler serving one context path
+        ContextHandler contextHandler = new ContextHandler();
+        contextHandler.setContextPath(contextPath);
+        contextHandler.setAllowNullPathInfo(true);
+        contextHandler.setHandler(new PrometheusHandler(context, session, maxBatch));
+        serverEndpoint.setHandler(contextHandler);
+
+        try {
+            serverEndpoint.start();
+            serverEndpoint.join();
+        } catch (Exception e) {
+            context.yield();
+            e.printStackTrace();
+        }
+    }
+
+    private class PrometheusHandler extends AbstractHandler {
+        private final JsonFormat.Printer JSON_PRINTER = JsonFormat.printer();
+        private ProcessSession session;
+        private ProcessContext context;
+        private FlowFile flowfile;
+        private int maxBatch;
+        private List<Metrics> metricList = new ArrayList<>();
+
+        public PrometheusHandler(ProcessContext context, ProcessSession session, int maxBatch) {
+            super();
+            this.session = session;
+            this.context = context;
+            this.maxBatch = maxBatch;
+        }
+
+        public void commitFlowFile(FlowFile flowFile, ProcessSession session, HttpServletRequest request){
+            session.putAttribute(flowfile, "mime.type","application/json");
+            session.transfer(flowfile, REL_SUCCESS);
+            session.getProvenanceReporter().receive(flowfile, request.getRequestURI());
+            session.commit();
+            getLogger().debug("SUCCESS relation FlowFile: {}.", new Object[]{flowfile.getId()});
+        }
+
+        @Override
+        public void handle(String target,
+                           Request baseRequest,
+                           HttpServletRequest request,
+                           HttpServletResponse response) throws IOException,ServletException {
+
+            // Uncompress the request on the fly: Snappy compressed protocol buffer message.
+            try (SnappyInputStream is = new SnappyInputStream(baseRequest.getInputStream())) {
+
+                if (is == null) {
+                     getLogger().error("InputStream is null");
+                }
+
+                ServletOutputStream responseOut = response.getOutputStream();
+                response.setStatus(HttpServletResponse.SC_OK);
+
+                WriteRequest writeRequest = WriteRequest.parseFrom(is);
+
+                for (Types.TimeSeries timeSeries: writeRequest.getTimeseriesList()) {
+                    List<MetricLabel> labelsList = new ArrayList<>();
+                    List<MetricSample> sampleList = new ArrayList<>();
+                    Metrics metrics = new Metrics();
+                    Gson gson = new Gson();

Review comment:
       IMO it would be better to use a RecordWriter (specified by the user) for output rather than only JSON, that way the user can decide which format (JSON, Avro, CSV, XML, etc.) to output the metrics in.

##########
File path: nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-processors/src/main/java/org/apache/nifi/processors/prometheus/PrometheusRemoteWrite.java
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.prometheus;
+
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.ContextHandler;
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import com.google.gson.Gson;

Review comment:
       Is there a compelling reason to use Gson here? Most components in NiFi use Jackson as it is faster

##########
File path: nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-processors/src/main/java/org/apache/nifi/processors/prometheus/PrometheusRemoteWrite.java
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.prometheus;
+
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.ContextHandler;
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import com.google.gson.Gson;
+import com.google.protobuf.util.JsonFormat;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.xerial.snappy.SnappyInputStream;
+import prometheus.Remote.WriteRequest;
+import prometheus.Types;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({ "prometheus", "metrics", "adapter", "remote write" })
+@CapabilityDescription("Listen for incoming samples from Prometheus." +
+        " Implements a remote endpoint adapter for writing Prometheus" +
+        " samples, primarily intended for metrics long term storage")
+@WritesAttributes({@WritesAttribute(attribute = "mime.type", description = "This is always application/json.")})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+public class PrometheusRemoteWrite extends AbstractProcessor {
+
+    public static final PropertyDescriptor REMOTE_WRITE_CONTEXT = new PropertyDescriptor
+            .Builder().name("Remote Write Context")
+            .displayName("Remote Write URL context")
+            .description("The context used in the remote_write url property in Prometheus")
+            .required(true)
+            .defaultValue("/write")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor REMOTE_WRITE_PORT = new PropertyDescriptor
+            .Builder().name("Remote Write Port")
+            .displayName("Remote Write URL port")
+            .description("The port used in the remote_write url property in Prometheus")
+            .required(true)
+            .addValidator(StandardValidators.INTEGER_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MAX_BATCH_METRICS = new PropertyDescriptor
+            .Builder().name("Max Batch Metrics")
+            .displayName("Max Batch Metrics")
+            .description("Number of max metrics in one FlowFile. When is not set, a FlowFile per metric is emitted")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("All content that is received is routed to the 'success' relationship")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("All content that is received is routed to the 'failure' relationship")
+            .build();
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static Server serverEndpoint;
+
+    // Maximum threads spawn, defaults to 200 max, min 8 threads
+    private static final int JETTY_MAX_THREADS = 500;

Review comment:
       Should this be a configurable property for the user to set?

##########
File path: nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-processors/src/test/java/org/apache/nifi/processors/prometheus/PrometheusRemoteWriteBatchTest.java
##########
@@ -0,0 +1,147 @@
+/*

Review comment:
       Even though GitHub Actions passed, these tests fail for me locally with a "No route to host" exception. The host/port should be fixed if a server is started for the test, or if a Prometheus instance is required to run the test, it should be an integration test instead (ending in `IT` rather than `Test`)

##########
File path: nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-processors/src/main/java/org/apache/nifi/processors/prometheus/PrometheusRemoteWrite.java
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.prometheus;
+
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.ContextHandler;
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import com.google.gson.Gson;
+import com.google.protobuf.util.JsonFormat;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.xerial.snappy.SnappyInputStream;
+import prometheus.Remote.WriteRequest;
+import prometheus.Types;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({ "prometheus", "metrics", "adapter", "remote write" })
+@CapabilityDescription("Listen for incoming samples from Prometheus." +
+        " Implements a remote endpoint adapter for writing Prometheus" +
+        " samples, primarily intended for metrics long term storage")
+@WritesAttributes({@WritesAttribute(attribute = "mime.type", description = "This is always application/json.")})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+public class PrometheusRemoteWrite extends AbstractProcessor {
+
+    public static final PropertyDescriptor REMOTE_WRITE_CONTEXT = new PropertyDescriptor
+            .Builder().name("Remote Write Context")
+            .displayName("Remote Write URL context")
+            .description("The context used in the remote_write url property in Prometheus")
+            .required(true)
+            .defaultValue("/write")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor REMOTE_WRITE_PORT = new PropertyDescriptor
+            .Builder().name("Remote Write Port")
+            .displayName("Remote Write URL port")
+            .description("The port used in the remote_write url property in Prometheus")
+            .required(true)
+            .addValidator(StandardValidators.INTEGER_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MAX_BATCH_METRICS = new PropertyDescriptor
+            .Builder().name("Max Batch Metrics")
+            .displayName("Max Batch Metrics")
+            .description("Number of max metrics in one FlowFile. When is not set, a FlowFile per metric is emitted")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("All content that is received is routed to the 'success' relationship")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("All content that is received is routed to the 'failure' relationship")
+            .build();
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static Server serverEndpoint;
+
+    // Maximum threads spawn, defaults to 200 max, min 8 threads
+    private static final int JETTY_MAX_THREADS = 500;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
+        descriptors.add(REMOTE_WRITE_CONTEXT);
+        descriptors.add(REMOTE_WRITE_PORT);
+        descriptors.add(MAX_BATCH_METRICS);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<Relationship>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() throws Exception {
+        getLogger().debug("onUnscheduled called");
+        if (serverEndpoint != null) {
+            serverEndpoint.stop();
+        }
+    }
+
+    @OnShutdown
+    public void onShutdown() throws Exception {
+        getLogger().debug("onShutdown called");
+        if (serverEndpoint != null) {
+            serverEndpoint.stop();
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final int port = context.getProperty(REMOTE_WRITE_PORT).asInteger();
+        final String contextPath = context.getProperty(REMOTE_WRITE_CONTEXT).getValue();
+        final int maxBatch;
+
+        if (context.getProperty(MAX_BATCH_METRICS).isSet()) {
+            maxBatch = context.getProperty(MAX_BATCH_METRICS).asInteger();
+        } else {
+           maxBatch = 0;
+        }
+
+        getLogger().debug("onTrigger called");
+
+        // Internal Jetty thread pool tuning
+        QueuedThreadPool threadPool = new QueuedThreadPool();
+        threadPool.setMaxThreads(JETTY_MAX_THREADS);
+        serverEndpoint = new Server(threadPool);
+
+        ServerConnector connector = new ServerConnector(serverEndpoint);
+        connector.setPort(port);
+        serverEndpoint.addConnector(connector);
+
+        // Setup only one handler serving one context path
+        ContextHandler contextHandler = new ContextHandler();
+        contextHandler.setContextPath(contextPath);
+        contextHandler.setAllowNullPathInfo(true);
+        contextHandler.setHandler(new PrometheusHandler(context, session, maxBatch));
+        serverEndpoint.setHandler(contextHandler);
+
+        try {
+            serverEndpoint.start();
+            serverEndpoint.join();
+        } catch (Exception e) {
+            context.yield();
+            e.printStackTrace();
+        }
+    }
+
+    private class PrometheusHandler extends AbstractHandler {
+        private final JsonFormat.Printer JSON_PRINTER = JsonFormat.printer();
+        private ProcessSession session;
+        private ProcessContext context;
+        private FlowFile flowfile;
+        private int maxBatch;
+        private List<Metrics> metricList = new ArrayList<>();
+
+        public PrometheusHandler(ProcessContext context, ProcessSession session, int maxBatch) {
+            super();
+            this.session = session;
+            this.context = context;
+            this.maxBatch = maxBatch;
+        }
+
+        public void commitFlowFile(FlowFile flowFile, ProcessSession session, HttpServletRequest request){
+            session.putAttribute(flowfile, "mime.type","application/json");
+            session.transfer(flowfile, REL_SUCCESS);
+            session.getProvenanceReporter().receive(flowfile, request.getRequestURI());
+            session.commit();
+            getLogger().debug("SUCCESS relation FlowFile: {}.", new Object[]{flowfile.getId()});
+        }
+
+        @Override
+        public void handle(String target,
+                           Request baseRequest,
+                           HttpServletRequest request,
+                           HttpServletResponse response) throws IOException,ServletException {
+
+            // Uncompress the request on the fly: Snappy compressed protocol buffer message.
+            try (SnappyInputStream is = new SnappyInputStream(baseRequest.getInputStream())) {
+
+                if (is == null) {
+                     getLogger().error("InputStream is null");
+                }
+
+                ServletOutputStream responseOut = response.getOutputStream();
+                response.setStatus(HttpServletResponse.SC_OK);
+
+                WriteRequest writeRequest = WriteRequest.parseFrom(is);
+
+                for (Types.TimeSeries timeSeries: writeRequest.getTimeseriesList()) {
+                    List<MetricLabel> labelsList = new ArrayList<>();
+                    List<MetricSample> sampleList = new ArrayList<>();
+                    Metrics metrics = new Metrics();
+                    Gson gson = new Gson();
+
+                    for (Types.Label labelItem: timeSeries.getLabelsList()) {
+                        MetricLabel metricsLabel = new MetricLabel();
+                        metricsLabel.name = labelItem.getName();
+                        metricsLabel.value = labelItem.getValue();
+                        labelsList.add(metricsLabel);
+                    }
+                    for (Types.Sample sample: timeSeries.getSamplesList()) {
+                        MetricSample metricSample = new MetricSample();
+                        metricSample.sample = Double.toString(sample.getValue());
+                        metricSample.timestamp = Long.toString(sample.getTimestamp());
+                        sampleList.add(metricSample);
+                    }
+                    metrics.metricLabels= labelsList;
+                    metrics.metricSamples = sampleList;
+
+                    if (maxBatch == 0 || maxBatch == 1) {
+                        flowfile = session.create();
+                        session.write(flowfile, new OutputStreamCallback() {
+                            @Override
+                            public void process(OutputStream out) throws IOException {
+                                out.write(gson.toJson(metrics).getBytes());
+                                out.flush();
+                                out.close();
+                            }
+                        });
+                        getLogger().debug("Batch mode disabled written FlowFile: {}.", new Object[]{flowfile.getId()});
+                        commitFlowFile(flowfile, session, request);
+                    } else if (metricList.size() < maxBatch) {
+                        metricList.add(metrics);
+                    }
+
+                    if (metricList.size() == maxBatch) {
+                        flowfile = session.create();
+                        session.write(flowfile, new OutputStreamCallback() {
+                            @Override
+                            public void process(OutputStream out) throws IOException {
+                                out.write(gson.toJson(metricList).getBytes());
+                                out.flush();
+                                out.close();
+                            }
+                        });
+                        getLogger().debug("Batch mode enabled, writing {} metrics in FlowFile {}.",
+                                new Object[]{metricList.size(), flowfile.getId()});
+                        commitFlowFile(flowfile, session, request);

Review comment:
       Can `metricList.size()` equal 0 or 1? If so it appears the flowfile would get transferred twice (which would cause an error)

##########
File path: nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-processors/pom.xml
##########
@@ -0,0 +1,133 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-prometheus-bundle</artifactId>
+        <version>1.13.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-prometheus-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <properties>
+        <jetty.version>9.3.9.v20160517</jetty.version>

Review comment:
       Is there a reason to override the `jetty.version` property in the root POM? NiFi uses a later Jetty version than this




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org