You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/12/02 16:38:56 UTC

[GitHub] [nifi] mattyb149 commented on a change in pull request #4651: NIFI-7988 Prometheus Remote Write Processor

mattyb149 commented on a change in pull request #4651:
URL: https://github.com/apache/nifi/pull/4651#discussion_r534304482



##########
File path: nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-processors/pom.xml
##########
@@ -0,0 +1,133 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-prometheus-bundle</artifactId>
+        <version>1.13.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-prometheus-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <properties>
+        <jetty.version>9.3.9.v20160517</jetty.version>
+        <protobuf.version>3.13.0</protobuf.version>
+        <snappy.version>1.1.7.1</snappy.version>
+        <maven.protobuf.version>0.6.1</maven.protobuf.version>
+        <maven.os.version>1.6.2</maven.os.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>${protobuf.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java-util</artifactId>
+            <version>${protobuf.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.xerial.snappy</groupId>
+            <artifactId>snappy-java</artifactId>

Review comment:
       This needs an entry in the NOTICE file for the NAR, see examples in other NARs (such as nifi-avro-nar)

##########
File path: nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-processors/pom.xml
##########
@@ -0,0 +1,133 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-prometheus-bundle</artifactId>
+        <version>1.13.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-prometheus-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <properties>
+        <jetty.version>9.3.9.v20160517</jetty.version>
+        <protobuf.version>3.13.0</protobuf.version>
+        <snappy.version>1.1.7.1</snappy.version>
+        <maven.protobuf.version>0.6.1</maven.protobuf.version>
+        <maven.os.version>1.6.2</maven.os.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>${protobuf.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java-util</artifactId>
+            <version>${protobuf.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.xerial.snappy</groupId>
+            <artifactId>snappy-java</artifactId>
+            <version>${snappy.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-server</artifactId>
+            <version>${jetty.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-servlet</artifactId>
+            <version>${jetty.version}</version>
+        </dependency>
+       <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-client</artifactId>
+            <version>${jetty.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <extensions>
+          <extension>
+            <groupId>kr.motd.maven</groupId>
+            <artifactId>os-maven-plugin</artifactId>
+            <version>${maven.os.version}</version>
+          </extension>
+        </extensions>
+        <plugins>
+            <plugin>
+                <groupId>org.xolstice.maven.plugins</groupId>
+                <artifactId>protobuf-maven-plugin</artifactId>
+                <version>${maven.protobuf.version}</version>
+                <configuration>
+                    <protocArtifact>com.google.protobuf:protoc:3.13.0:exe:${os.detected.classifier}</protocArtifact>
+                    <protoSourceRoot>${basedir}/src/main/resources/proto</protoSourceRoot>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>compile</goal>
+                            <goal>test-compile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes>
+                        <exclude>**/src/main/resources/proto/gogoproto/gogo.proto</exclude>

Review comment:
       gogo.proto has a license header, does it not pass the Rat check?

##########
File path: nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-processors/src/main/java/org/apache/nifi/processors/prometheus/PrometheusRemoteWrite.java
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.prometheus;
+
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.ContextHandler;
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import com.google.gson.Gson;
+import com.google.protobuf.util.JsonFormat;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.xerial.snappy.SnappyInputStream;
+import prometheus.Remote.WriteRequest;
+import prometheus.Types;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({ "prometheus", "metrics", "adapter", "remote write" })
+@CapabilityDescription("Listen for incoming samples from Prometheus." +
+        " Implements a remote endpoint adapter for writing Prometheus" +
+        " samples, primarily intended for metrics long term storage")
+@WritesAttributes({@WritesAttribute(attribute = "mime.type", description = "This is always application/json.")})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+public class PrometheusRemoteWrite extends AbstractProcessor {
+
+    public static final PropertyDescriptor REMOTE_WRITE_CONTEXT = new PropertyDescriptor
+            .Builder().name("Remote Write Context")
+            .displayName("Remote Write URL context")
+            .description("The context used in the remote_write url property in Prometheus")
+            .required(true)
+            .defaultValue("/write")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor REMOTE_WRITE_PORT = new PropertyDescriptor
+            .Builder().name("Remote Write Port")
+            .displayName("Remote Write URL port")
+            .description("The port used in the remote_write url property in Prometheus")
+            .required(true)
+            .addValidator(StandardValidators.INTEGER_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MAX_BATCH_METRICS = new PropertyDescriptor
+            .Builder().name("Max Batch Metrics")
+            .displayName("Max Batch Metrics")
+            .description("Number of max metrics in one FlowFile. When is not set, a FlowFile per metric is emitted")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("All content that is received is routed to the 'success' relationship")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("All content that is received is routed to the 'failure' relationship")
+            .build();
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static Server serverEndpoint;
+
+    // Maximum threads spawn, defaults to 200 max, min 8 threads
+    private static final int JETTY_MAX_THREADS = 500;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
+        descriptors.add(REMOTE_WRITE_CONTEXT);
+        descriptors.add(REMOTE_WRITE_PORT);
+        descriptors.add(MAX_BATCH_METRICS);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<Relationship>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() throws Exception {
+        getLogger().debug("onUnscheduled called");
+        if (serverEndpoint != null) {
+            serverEndpoint.stop();
+        }
+    }
+
+    @OnShutdown
+    public void onShutdown() throws Exception {
+        getLogger().debug("onShutdown called");
+        if (serverEndpoint != null) {
+            serverEndpoint.stop();
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final int port = context.getProperty(REMOTE_WRITE_PORT).asInteger();
+        final String contextPath = context.getProperty(REMOTE_WRITE_CONTEXT).getValue();
+        final int maxBatch;
+
+        if (context.getProperty(MAX_BATCH_METRICS).isSet()) {
+            maxBatch = context.getProperty(MAX_BATCH_METRICS).asInteger();
+        } else {
+           maxBatch = 0;
+        }
+
+        getLogger().debug("onTrigger called");
+
+        // Internal Jetty thread pool tuning
+        QueuedThreadPool threadPool = new QueuedThreadPool();
+        threadPool.setMaxThreads(JETTY_MAX_THREADS);
+        serverEndpoint = new Server(threadPool);
+
+        ServerConnector connector = new ServerConnector(serverEndpoint);
+        connector.setPort(port);
+        serverEndpoint.addConnector(connector);
+
+        // Setup only one handler serving one context path
+        ContextHandler contextHandler = new ContextHandler();
+        contextHandler.setContextPath(contextPath);
+        contextHandler.setAllowNullPathInfo(true);
+        contextHandler.setHandler(new PrometheusHandler(context, session, maxBatch));
+        serverEndpoint.setHandler(contextHandler);
+
+        try {
+            serverEndpoint.start();
+            serverEndpoint.join();
+        } catch (Exception e) {
+            context.yield();
+            e.printStackTrace();
+        }
+    }
+
+    private class PrometheusHandler extends AbstractHandler {
+        private final JsonFormat.Printer JSON_PRINTER = JsonFormat.printer();
+        private ProcessSession session;
+        private ProcessContext context;
+        private FlowFile flowfile;
+        private int maxBatch;
+        private List<Metrics> metricList = new ArrayList<>();
+
+        public PrometheusHandler(ProcessContext context, ProcessSession session, int maxBatch) {
+            super();
+            this.session = session;
+            this.context = context;
+            this.maxBatch = maxBatch;
+        }
+
+        public void commitFlowFile(FlowFile flowFile, ProcessSession session, HttpServletRequest request){
+            session.putAttribute(flowfile, "mime.type","application/json");
+            session.transfer(flowfile, REL_SUCCESS);
+            session.getProvenanceReporter().receive(flowfile, request.getRequestURI());
+            session.commit();
+            getLogger().debug("SUCCESS relation FlowFile: {}.", new Object[]{flowfile.getId()});
+        }
+
+        @Override
+        public void handle(String target,
+                           Request baseRequest,
+                           HttpServletRequest request,
+                           HttpServletResponse response) throws IOException,ServletException {
+
+            // Uncompress the request on the fly: Snappy compressed protocol buffer message.
+            try (SnappyInputStream is = new SnappyInputStream(baseRequest.getInputStream())) {
+
+                if (is == null) {
+                     getLogger().error("InputStream is null");
+                }
+
+                ServletOutputStream responseOut = response.getOutputStream();
+                response.setStatus(HttpServletResponse.SC_OK);
+
+                WriteRequest writeRequest = WriteRequest.parseFrom(is);
+
+                for (Types.TimeSeries timeSeries: writeRequest.getTimeseriesList()) {
+                    List<MetricLabel> labelsList = new ArrayList<>();
+                    List<MetricSample> sampleList = new ArrayList<>();
+                    Metrics metrics = new Metrics();
+                    Gson gson = new Gson();

Review comment:
       IMO it would be better to use a RecordWriter (specified by the user) for output rather than only JSON, that way the user can decide which format (JSON, Avro, CSV, XML, etc.) to output the metrics in.

##########
File path: nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-processors/src/main/java/org/apache/nifi/processors/prometheus/PrometheusRemoteWrite.java
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.prometheus;
+
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.ContextHandler;
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import com.google.gson.Gson;

Review comment:
       Is there a compelling reason to use Gson here? Most components in NiFi use Jackson as it is faster

##########
File path: nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-processors/src/main/java/org/apache/nifi/processors/prometheus/PrometheusRemoteWrite.java
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.prometheus;
+
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.ContextHandler;
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import com.google.gson.Gson;
+import com.google.protobuf.util.JsonFormat;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.xerial.snappy.SnappyInputStream;
+import prometheus.Remote.WriteRequest;
+import prometheus.Types;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({ "prometheus", "metrics", "adapter", "remote write" })
+@CapabilityDescription("Listen for incoming samples from Prometheus." +
+        " Implements a remote endpoint adapter for writing Prometheus" +
+        " samples, primarily intended for metrics long term storage")
+@WritesAttributes({@WritesAttribute(attribute = "mime.type", description = "This is always application/json.")})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+public class PrometheusRemoteWrite extends AbstractProcessor {
+
+    public static final PropertyDescriptor REMOTE_WRITE_CONTEXT = new PropertyDescriptor
+            .Builder().name("Remote Write Context")
+            .displayName("Remote Write URL context")
+            .description("The context used in the remote_write url property in Prometheus")
+            .required(true)
+            .defaultValue("/write")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor REMOTE_WRITE_PORT = new PropertyDescriptor
+            .Builder().name("Remote Write Port")
+            .displayName("Remote Write URL port")
+            .description("The port used in the remote_write url property in Prometheus")
+            .required(true)
+            .addValidator(StandardValidators.INTEGER_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MAX_BATCH_METRICS = new PropertyDescriptor
+            .Builder().name("Max Batch Metrics")
+            .displayName("Max Batch Metrics")
+            .description("Number of max metrics in one FlowFile. When is not set, a FlowFile per metric is emitted")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("All content that is received is routed to the 'success' relationship")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("All content that is received is routed to the 'failure' relationship")
+            .build();
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static Server serverEndpoint;
+
+    // Maximum threads spawn, defaults to 200 max, min 8 threads
+    private static final int JETTY_MAX_THREADS = 500;

Review comment:
       Should this be a configurable property for the user to set?

##########
File path: nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-processors/src/test/java/org/apache/nifi/processors/prometheus/PrometheusRemoteWriteBatchTest.java
##########
@@ -0,0 +1,147 @@
+/*

Review comment:
       Even though GitHub Actions passed, these tests fail for me locally with a "No route to host" exception. The host/port should be fixed if a server is started for the test, or if a Prometheus instance is required to run the test, it should be an integration test instead (ending in `IT` rather than `Test`)

##########
File path: nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-processors/src/main/java/org/apache/nifi/processors/prometheus/PrometheusRemoteWrite.java
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.prometheus;
+
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.ContextHandler;
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import com.google.gson.Gson;
+import com.google.protobuf.util.JsonFormat;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.xerial.snappy.SnappyInputStream;
+import prometheus.Remote.WriteRequest;
+import prometheus.Types;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({ "prometheus", "metrics", "adapter", "remote write" })
+@CapabilityDescription("Listen for incoming samples from Prometheus." +
+        " Implements a remote endpoint adapter for writing Prometheus" +
+        " samples, primarily intended for metrics long term storage")
+@WritesAttributes({@WritesAttribute(attribute = "mime.type", description = "This is always application/json.")})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+public class PrometheusRemoteWrite extends AbstractProcessor {
+
+    public static final PropertyDescriptor REMOTE_WRITE_CONTEXT = new PropertyDescriptor
+            .Builder().name("Remote Write Context")
+            .displayName("Remote Write URL context")
+            .description("The context used in the remote_write url property in Prometheus")
+            .required(true)
+            .defaultValue("/write")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor REMOTE_WRITE_PORT = new PropertyDescriptor
+            .Builder().name("Remote Write Port")
+            .displayName("Remote Write URL port")
+            .description("The port used in the remote_write url property in Prometheus")
+            .required(true)
+            .addValidator(StandardValidators.INTEGER_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MAX_BATCH_METRICS = new PropertyDescriptor
+            .Builder().name("Max Batch Metrics")
+            .displayName("Max Batch Metrics")
+            .description("Number of max metrics in one FlowFile. When is not set, a FlowFile per metric is emitted")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("All content that is received is routed to the 'success' relationship")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("All content that is received is routed to the 'failure' relationship")
+            .build();
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static Server serverEndpoint;
+
+    // Maximum threads spawn, defaults to 200 max, min 8 threads
+    private static final int JETTY_MAX_THREADS = 500;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
+        descriptors.add(REMOTE_WRITE_CONTEXT);
+        descriptors.add(REMOTE_WRITE_PORT);
+        descriptors.add(MAX_BATCH_METRICS);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<Relationship>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() throws Exception {
+        getLogger().debug("onUnscheduled called");
+        if (serverEndpoint != null) {
+            serverEndpoint.stop();
+        }
+    }
+
+    @OnShutdown
+    public void onShutdown() throws Exception {
+        getLogger().debug("onShutdown called");
+        if (serverEndpoint != null) {
+            serverEndpoint.stop();
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final int port = context.getProperty(REMOTE_WRITE_PORT).asInteger();
+        final String contextPath = context.getProperty(REMOTE_WRITE_CONTEXT).getValue();
+        final int maxBatch;
+
+        if (context.getProperty(MAX_BATCH_METRICS).isSet()) {
+            maxBatch = context.getProperty(MAX_BATCH_METRICS).asInteger();
+        } else {
+           maxBatch = 0;
+        }
+
+        getLogger().debug("onTrigger called");
+
+        // Internal Jetty thread pool tuning
+        QueuedThreadPool threadPool = new QueuedThreadPool();
+        threadPool.setMaxThreads(JETTY_MAX_THREADS);
+        serverEndpoint = new Server(threadPool);
+
+        ServerConnector connector = new ServerConnector(serverEndpoint);
+        connector.setPort(port);
+        serverEndpoint.addConnector(connector);
+
+        // Setup only one handler serving one context path
+        ContextHandler contextHandler = new ContextHandler();
+        contextHandler.setContextPath(contextPath);
+        contextHandler.setAllowNullPathInfo(true);
+        contextHandler.setHandler(new PrometheusHandler(context, session, maxBatch));
+        serverEndpoint.setHandler(contextHandler);
+
+        try {
+            serverEndpoint.start();
+            serverEndpoint.join();
+        } catch (Exception e) {
+            context.yield();
+            e.printStackTrace();
+        }
+    }
+
+    private class PrometheusHandler extends AbstractHandler {
+        private final JsonFormat.Printer JSON_PRINTER = JsonFormat.printer();
+        private ProcessSession session;
+        private ProcessContext context;
+        private FlowFile flowfile;
+        private int maxBatch;
+        private List<Metrics> metricList = new ArrayList<>();
+
+        public PrometheusHandler(ProcessContext context, ProcessSession session, int maxBatch) {
+            super();
+            this.session = session;
+            this.context = context;
+            this.maxBatch = maxBatch;
+        }
+
+        public void commitFlowFile(FlowFile flowFile, ProcessSession session, HttpServletRequest request){
+            session.putAttribute(flowfile, "mime.type","application/json");
+            session.transfer(flowfile, REL_SUCCESS);
+            session.getProvenanceReporter().receive(flowfile, request.getRequestURI());
+            session.commit();
+            getLogger().debug("SUCCESS relation FlowFile: {}.", new Object[]{flowfile.getId()});
+        }
+
+        @Override
+        public void handle(String target,
+                           Request baseRequest,
+                           HttpServletRequest request,
+                           HttpServletResponse response) throws IOException,ServletException {
+
+            // Uncompress the request on the fly: Snappy compressed protocol buffer message.
+            try (SnappyInputStream is = new SnappyInputStream(baseRequest.getInputStream())) {
+
+                if (is == null) {
+                     getLogger().error("InputStream is null");
+                }
+
+                ServletOutputStream responseOut = response.getOutputStream();
+                response.setStatus(HttpServletResponse.SC_OK);
+
+                WriteRequest writeRequest = WriteRequest.parseFrom(is);
+
+                for (Types.TimeSeries timeSeries: writeRequest.getTimeseriesList()) {
+                    List<MetricLabel> labelsList = new ArrayList<>();
+                    List<MetricSample> sampleList = new ArrayList<>();
+                    Metrics metrics = new Metrics();
+                    Gson gson = new Gson();
+
+                    for (Types.Label labelItem: timeSeries.getLabelsList()) {
+                        MetricLabel metricsLabel = new MetricLabel();
+                        metricsLabel.name = labelItem.getName();
+                        metricsLabel.value = labelItem.getValue();
+                        labelsList.add(metricsLabel);
+                    }
+                    for (Types.Sample sample: timeSeries.getSamplesList()) {
+                        MetricSample metricSample = new MetricSample();
+                        metricSample.sample = Double.toString(sample.getValue());
+                        metricSample.timestamp = Long.toString(sample.getTimestamp());
+                        sampleList.add(metricSample);
+                    }
+                    metrics.metricLabels= labelsList;
+                    metrics.metricSamples = sampleList;
+
+                    if (maxBatch == 0 || maxBatch == 1) {
+                        flowfile = session.create();
+                        session.write(flowfile, new OutputStreamCallback() {
+                            @Override
+                            public void process(OutputStream out) throws IOException {
+                                out.write(gson.toJson(metrics).getBytes());
+                                out.flush();
+                                out.close();
+                            }
+                        });
+                        getLogger().debug("Batch mode disabled written FlowFile: {}.", new Object[]{flowfile.getId()});
+                        commitFlowFile(flowfile, session, request);
+                    } else if (metricList.size() < maxBatch) {
+                        metricList.add(metrics);
+                    }
+
+                    if (metricList.size() == maxBatch) {
+                        flowfile = session.create();
+                        session.write(flowfile, new OutputStreamCallback() {
+                            @Override
+                            public void process(OutputStream out) throws IOException {
+                                out.write(gson.toJson(metricList).getBytes());
+                                out.flush();
+                                out.close();
+                            }
+                        });
+                        getLogger().debug("Batch mode enabled, writing {} metrics in FlowFile {}.",
+                                new Object[]{metricList.size(), flowfile.getId()});
+                        commitFlowFile(flowfile, session, request);

Review comment:
       Can `metricList.size()` equal 0 or 1? If so it appears the flowfile would get transferred twice (which would cause an error)

##########
File path: nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-processors/pom.xml
##########
@@ -0,0 +1,133 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-prometheus-bundle</artifactId>
+        <version>1.13.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-prometheus-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <properties>
+        <jetty.version>9.3.9.v20160517</jetty.version>

Review comment:
       Is there a reason to override the `jetty.version` property in the root POM? NiFi uses a later Jetty version than this




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org