You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2020/06/12 20:51:10 UTC

[nifi-minifi] 01/02: MINIFI-492 - Generic reporting tasks

This is an automated email from the ASF dual-hosted git repository.

aldrin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi.git

commit e44da568d8c099cda7b7fa4db6722d8b66aa9562
Author: Nghia Le <mi...@gmail.com>
AuthorDate: Mon May 25 22:46:14 2020 +0200

    MINIFI-492 - Generic reporting tasks
    
    This closes #187.
---
 .../minifi/bootstrap/util/ConfigTransformer.java   |  68 +++----
 .../bootstrap/util/ConfigTransformerTest.java      |  26 +++
 .../src/test/resources/config-reporting-task.yml   | 218 +++++++++++++++++++++
 .../nifi/minifi/commons/schema/ConfigSchema.java   |  15 ++
 .../commons/schema/ProvenanceReportingSchema.java  |  42 +++-
 .../minifi/commons/schema/ReportingSchema.java     |  98 +++++++++
 .../commons/schema/common/CommonPropertyKeys.java  |   1 +
 7 files changed, 426 insertions(+), 42 deletions(-)

diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
index da4af51..677080c 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
@@ -38,6 +38,7 @@ import org.apache.nifi.minifi.commons.schema.ProvenanceReportingSchema;
 import org.apache.nifi.minifi.commons.schema.ProvenanceRepositorySchema;
 import org.apache.nifi.minifi.commons.schema.RemotePortSchema;
 import org.apache.nifi.minifi.commons.schema.RemoteProcessGroupSchema;
+import org.apache.nifi.minifi.commons.schema.ReportingSchema;
 import org.apache.nifi.minifi.commons.schema.SecurityPropertiesSchema;
 import org.apache.nifi.minifi.commons.schema.SensitivePropsSchema;
 import org.apache.nifi.minifi.commons.schema.SwapSchema;
@@ -81,7 +82,6 @@ public final class ConfigTransformer {
     // Underlying version of NIFI will be using
     public static final String NIFI_VERSION = "1.8.0";
     public static final String ROOT_GROUP = "Root-Group";
-    public static final String DEFAULT_PROV_REPORTING_TASK_CLASS = "org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask";
     public static final String NIFI_VERSION_KEY = "nifi.version";
 
     public static final Logger logger = LoggerFactory.getLogger(ConfigTransformer.class);
@@ -178,7 +178,7 @@ public final class ConfigTransformer {
             ProvenanceRepositorySchema provenanceRepositorySchema = configSchema.getProvenanceRepositorySchema();
 
             OrderedProperties orderedProperties = new OrderedProperties();
-            orderedProperties.setProperty(NIFI_VERSION_KEY, NIFI_VERSION,"# Core Properties #" + System.lineSeparator());
+            orderedProperties.setProperty(NIFI_VERSION_KEY, NIFI_VERSION, "# Core Properties #" + System.lineSeparator());
             orderedProperties.setProperty("nifi.flow.configuration.file", "./conf/flow.xml.gz");
             orderedProperties.setProperty("nifi.flow.configuration.archive.enabled", "false");
             orderedProperties.setProperty("nifi.flow.configuration.archive.dir", "./conf/archive/");
@@ -288,7 +288,7 @@ public final class ConfigTransformer {
         }
     }
 
-    protected static DOMSource createFlowXml(ConfigSchema configSchema) throws IOException, ConfigurationChangeException, ConfigTransformerException{
+    protected static DOMSource createFlowXml(ConfigSchema configSchema) throws IOException, ConfigurationChangeException, ConfigTransformerException {
         try {
             // create a new, empty document
             final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
@@ -317,9 +317,10 @@ public final class ConfigTransformer {
             addProcessGroup(doc, element, processGroupSchema, new ParentGroupIdResolver(processGroupSchema));
 
             SecurityPropertiesSchema securityProperties = configSchema.getSecurityProperties();
-            if (securityProperties.useSSL()) {
+            boolean useSSL = securityProperties.useSSL();
+            if (useSSL) {
                 Element controllerServicesNode = doc.getElementById("controllerServices");
-                if(controllerServicesNode == null) {
+                if (controllerServicesNode == null) {
                     controllerServicesNode = doc.createElement("controllerServices");
                 }
 
@@ -327,17 +328,27 @@ public final class ConfigTransformer {
                 addSSLControllerService(controllerServicesNode, securityProperties);
             }
 
+            List<ReportingSchema> reportingTasks = configSchema.getReportingTasksSchema();
             ProvenanceReportingSchema provenanceProperties = configSchema.getProvenanceReportingProperties();
             if (provenanceProperties != null) {
+                provenanceProperties.setSSL(useSSL);
+                ReportingSchema provenance = provenanceProperties.convert();
+                provenance.setId("Provenance-Reporting");
+                provenance.setName("Site-To-Site-Provenance-Reporting");
+                reportingTasks.add(provenance);
+            }
+            if (reportingTasks != null) {
                 final Element reportingTasksNode = doc.createElement("reportingTasks");
                 rootNode.appendChild(reportingTasksNode);
-                addProvenanceReportingTask(reportingTasksNode, configSchema);
+                for (ReportingSchema task : reportingTasks) {
+                    addReportingTask(reportingTasksNode, task);
+                }
             }
 
             return new DOMSource(doc);
         } catch (final ParserConfigurationException | DOMException | TransformerFactoryConfigurationError | IllegalArgumentException e) {
             throw new ConfigTransformerException(e);
-        } catch (Exception e){
+        } catch (Exception e) {
             throw new ConfigTransformerException("Failed to parse the config YAML while writing the top level of the flow xml", e);
         }
     }
@@ -384,7 +395,7 @@ public final class ConfigTransformer {
             addConfiguration(serviceElement, attributes);
 
             String annotationData = controllerServiceSchema.getAnnotationData();
-            if(annotationData != null && !annotationData.isEmpty()) {
+            if (annotationData != null && !annotationData.isEmpty()) {
                 addTextElement(element, "annotationData", annotationData);
             }
 
@@ -480,7 +491,7 @@ public final class ConfigTransformer {
             addTextElement(element, "runDurationNanos", String.valueOf(processorConfig.getRunDurationNanos()));
 
             String annotationData = processorConfig.getAnnotationData();
-            if(annotationData != null && !annotationData.isEmpty()) {
+            if (annotationData != null && !annotationData.isEmpty()) {
                 addTextElement(element, "annotationData", annotationData);
             }
 
@@ -508,34 +519,23 @@ public final class ConfigTransformer {
         addPosition(element);
     }
 
-    protected static void addProvenanceReportingTask(final Element element, ConfigSchema configSchema) throws ConfigurationChangeException {
+    protected static void addReportingTask(final Element parentElement, ReportingSchema reportingSchema) throws ConfigurationChangeException {
         try {
-            ProvenanceReportingSchema provenanceProperties = configSchema.getProvenanceReportingProperties();
-            final Element taskElement = element.getOwnerDocument().createElement("reportingTask");
-            addTextElement(taskElement, "id", "Provenance-Reporting");
-            addTextElement(taskElement, "name", "Site-To-Site-Provenance-Reporting");
-            addTextElement(taskElement, "comment", provenanceProperties.getComment());
-            addTextElement(taskElement, "class", DEFAULT_PROV_REPORTING_TASK_CLASS);
-            addTextElement(taskElement, "schedulingPeriod", provenanceProperties.getSchedulingPeriod());
-            addTextElement(taskElement, "scheduledState", "RUNNING");
-            addTextElement(taskElement, "schedulingStrategy", provenanceProperties.getSchedulingStrategy());
+            final Document doc = parentElement.getOwnerDocument();
+            final Element element = doc.createElement("reportingTask");
+            parentElement.appendChild(element);
 
-            Map<String, Object> attributes = new HashMap<>();
-            attributes.put("Destination URL", provenanceProperties.getDestinationUrl());
-            attributes.put("Input Port Name", provenanceProperties.getPortName());
-            attributes.put("Instance URL", provenanceProperties.getOriginatingUrl());
-            attributes.put("Compress Events", provenanceProperties.getUseCompression());
-            attributes.put("Batch Size", provenanceProperties.getBatchSize());
-            attributes.put("Communications Timeout", provenanceProperties.getTimeout());
-
-            SecurityPropertiesSchema securityProps = configSchema.getSecurityProperties();
-            if (securityProps.useSSL()) {
-                attributes.put("SSL Context Service", "SSL-Context-Service");
-            }
+            addTextElement(element, "id", reportingSchema.getId());
+            addTextElement(element, "name", reportingSchema.getName());
+            addTextElement(element, "comment", reportingSchema.getComment());
+            addTextElement(element, "class", reportingSchema.getReportingClass());
+            addTextElement(element, "schedulingPeriod", reportingSchema.getSchedulingPeriod());
+            addTextElement(element, "scheduledState", "RUNNING");
+            addTextElement(element, "schedulingStrategy", reportingSchema.getSchedulingStrategy());
 
-            addConfiguration(taskElement, attributes);
+            addConfiguration(element, reportingSchema.getProperties());
 
-            element.appendChild(taskElement);
+            parentElement.appendChild(element);
         } catch (Exception e) {
             throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the Provenance Reporting Task", e);
         }
@@ -725,7 +725,7 @@ public final class ConfigTransformer {
             "# distributed under the License is distributed on an \"AS IS\" BASIS,\n" +
             "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" +
             "# See the License for the specific language governing permissions and\n" +
-            "# limitations under the License.\n"+
+            "# limitations under the License.\n" +
             "\n";
 
 }
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
index 32a5852..88bc318 100644
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
@@ -29,6 +29,7 @@ import org.apache.nifi.minifi.commons.schema.ProcessGroupSchema;
 import org.apache.nifi.minifi.commons.schema.ProcessorSchema;
 import org.apache.nifi.minifi.commons.schema.RemotePortSchema;
 import org.apache.nifi.minifi.commons.schema.RemoteProcessGroupSchema;
+import org.apache.nifi.minifi.commons.schema.ReportingSchema;
 import org.apache.nifi.minifi.commons.schema.common.StringUtil;
 import org.apache.nifi.minifi.commons.schema.exception.SchemaLoaderException;
 import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader;
@@ -123,6 +124,11 @@ public class ConfigTransformerTest {
     }
 
     @Test
+    public void testReportingTasksTransform() throws Exception {
+        testConfigFileTransform("config-reporting-task.yml");
+    }
+
+    @Test
     public void testProcessGroupsTransform() throws Exception {
         testConfigFileTransform("config-process-groups.yml");
     }
@@ -507,6 +513,7 @@ public class ConfigTransformerTest {
         Document document = documentBuilder.parse(new ByteArrayInputStream(outputStream.toByteArray()));
 
         testProcessGroup((Element) xPathFactory.newXPath().evaluate("flowController/rootGroup", document, XPathConstants.NODE), configSchema.getProcessGroupSchema());
+        testReportingTasks((Element) xPathFactory.newXPath().evaluate("flowController/reportingTasks", document, XPathConstants.NODE), configSchema.getReportingTasksSchema());
     }
 
     private void testProcessGroup(Element element, ProcessGroupSchema processGroupSchema) throws XPathExpressionException {
@@ -579,6 +586,25 @@ public class ConfigTransformerTest {
         testProperties(element, processorSchema.getProperties());
     }
 
+    private void testReportingTasks(Element element, List<ReportingSchema> reportingSchemas) throws XPathExpressionException {
+        NodeList taskElements = (NodeList) xPathFactory.newXPath().evaluate("reportingTask", element, XPathConstants.NODESET);
+        assertEquals(reportingSchemas.size(), taskElements.getLength());
+        for (int i = 0; i < taskElements.getLength(); i++) {
+            testReportingTask((Element) taskElements.item(i), reportingSchemas.get(i));
+        }
+    }
+
+    private void testReportingTask(Element element, ReportingSchema reportingSchema) throws XPathExpressionException {
+        assertEquals(reportingSchema.getId(), getText(element, "id"));
+        assertEquals(reportingSchema.getName(), getText(element, "name"));
+        assertEquals(reportingSchema.getComment(), getText(element, "comment"));
+        assertEquals(reportingSchema.getReportingClass(), getText(element, "class"));
+        assertEquals(reportingSchema.getSchedulingPeriod(), getText(element, "schedulingPeriod"));
+        assertEquals(reportingSchema.getSchedulingStrategy(), getText(element, "schedulingStrategy"));
+
+        testProperties(element, reportingSchema.getProperties());
+    }
+
     private void testControllerService(Element element, ControllerServiceSchema controllerServiceSchema) throws XPathExpressionException {
         assertEquals(controllerServiceSchema.getId(), getText(element, "id"));
         assertEquals(controllerServiceSchema.getName(), getText(element, "name"));
diff --git a/minifi-bootstrap/src/test/resources/config-reporting-task.yml b/minifi-bootstrap/src/test/resources/config-reporting-task.yml
new file mode 100644
index 0000000..b746316
--- /dev/null
+++ b/minifi-bootstrap/src/test/resources/config-reporting-task.yml
@@ -0,0 +1,218 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+MiNiFi Config Version: 3
+
+Flow Controller:
+  name: MiNiFi Flow
+  comment:
+
+Core Properties:
+  flow controller graceful shutdown period: 10 sec
+  flow service write delay interval: 500 ms
+  administrative yield duration: 30 sec
+  bored yield duration: 10 millis
+  max concurrent threads: 1
+
+FlowFile Repository:
+  partitions: 256
+  checkpoint interval: 2 mins
+  always sync: false
+  Swap:
+    threshold: 20000
+    in period: 5 sec
+    in threads: 1
+    out period: 5 sec
+    out threads: 4
+
+Content Repository:
+  content claim max appendable size: 10 MB
+  content claim max flow files: 100
+  always sync: false
+
+Component Status Repository:
+  buffer size: 1440
+  snapshot frequency: 1 min
+
+Security Properties:
+  keystore: ''
+  keystore type: ''
+  keystore password: ''
+  key password: ''
+  truststore: ''
+  truststore type: ''
+  truststore password: ''
+  ssl protocol: ''
+  Sensitive Props:
+    key: ''
+    algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL
+    provider: BC
+
+Processors:
+  - name: TailAppLog
+    id: c05f74a3-9f8f-3bb7-bced-f4735f962ec8
+    class: org.apache.nifi.processors.standard.TailFile
+    max concurrent tasks: 1
+    scheduling strategy: TIMER_DRIVEN
+    scheduling period: 10 sec
+    penalization period: 30 sec
+    yield period: 1 sec
+    run duration nanos: 0
+    auto-terminated relationships list:
+    Properties:
+      File to Tail: logs/minifi-app.log
+      Rolling Filename Pattern: minifi-app*
+      Initial Start Position: Beginning of File
+  - name: SplitIntoSingleLines
+    id: afbd6a07-64a0-39b3-953e-65b4678c5177
+    class: org.apache.nifi.processors.standard.SplitText
+    max concurrent tasks: 1
+    scheduling strategy: TIMER_DRIVEN
+    scheduling period: 0 sec
+    penalization period: 30 sec
+    yield period: 1 sec
+    run duration nanos: 0
+    auto-terminated relationships list:
+      - failure
+      - original
+    Properties:
+      Line Split Count: 1
+      Header Line Count: 0
+      Remove Trailing Newlines: true
+  - name: RouteErrors
+    id: c48394b5-39a5-3bd9-bf01-0bff0e0ad28d
+    class: org.apache.nifi.processors.standard.RouteText
+    max concurrent tasks: 1
+    scheduling strategy: TIMER_DRIVEN
+    scheduling period: 0 sec
+    penalization period: 30 sec
+    yield period: 1 sec
+    run duration nanos: 0
+    auto-terminated relationships list:
+      - unmatched
+      - original
+    Properties:
+      Routing Strategy: Route to 'matched' if line matches all conditions
+      Matching Strategy: Contains
+      Character Set: UTF-8
+      Ignore Leading/Trailing Whitespace: true
+      Ignore Case: true
+      Grouping Regular Expression:
+      WALFFR: WriteAheadFlowFileRepository
+  - name: PutFile
+    id: 1d80a4d8-86ae-3994-a988-2d7a46949091
+    class: org.apache.nifi.processors.standard.PutFile
+    max concurrent tasks: 1
+    scheduling strategy: TIMER_DRIVEN
+    scheduling period: 0 sec
+    penalization period: 30 sec
+    yield period: 1 sec
+    run duration nanos: 0
+    auto-terminated relationships list:
+      - failure
+      - success
+    Properties:
+      Directory: ./
+      Conflict Resolution Strategy: replace
+      Create Missing Directories: true
+      Maximum File Count:
+      Last Modified Time:
+      Permissions:
+      Owner:
+      Group:
+
+Connections:
+  - name: TailToSplit
+    id: 0401b747-1dca-31c7-ab4b-cdacf7e6c44b
+    source name: TailAppLog
+    source id: c05f74a3-9f8f-3bb7-bced-f4735f962ec8
+    source relationship names:
+      - success
+    destination name: SplitIntoSingleLines
+    destination id: afbd6a07-64a0-39b3-953e-65b4678c5177
+    max work queue size: 0
+    max work queue data size: 1 MB
+    flowfile expiration: 60 sec
+    queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer
+  - name: SplitToRoute
+    id: 093f9479-e2f0-3eb2-8184-151f42f22f34
+    source name: SplitIntoSingleLines
+    source id: afbd6a07-64a0-39b3-953e-65b4678c5177
+    source relationship names:
+      - splits
+    destination name: RouteErrors
+    destination id: c48394b5-39a5-3bd9-bf01-0bff0e0ad28d
+    max work queue size: 0
+    max work queue data size: 1 MB
+    flowfile expiration: 60 sec
+    queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer
+  - name: RouteToS2S
+    id: aacad63c-7e4c-31c9-888b-111e07c6fd22
+    source name: RouteErrors
+    source id: c48394b5-39a5-3bd9-bf01-0bff0e0ad28d
+    source relationship names:
+      - matched
+    destination name: PutFile
+    destination id: 1d80a4d8-86ae-3994-a988-2d7a46949091
+    max work queue size: 0
+    max work queue data size: 1 MB
+    flowfile expiration: 60 sec
+    queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer
+
+Provenance Reporting:
+  comment: old school config style
+  scheduling strategy: TIMER_DRIVEN
+  scheduling period: 30 sec
+  port name: provenance
+  destination url: http://localhost:8080/
+  originating url: http://${hostname(true)}:8081/nifi
+  use compression: true
+  timeout: 30 secs
+  batch size: 1000
+
+Reporting Tasks:
+  - name: Viet-Nam
+    class: org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask
+    comment: this sentence has three a’s, two c’s, two d’s, twenty-eight e’s, four f’s, four g’s, ten h’s, eight i’s, two l’s, eleven n’s, six o’s, seven r’s, twenty-seven s’s, eighteen t’s, three u’s, five v’s, six w’s, three x’s, and three y’s.
+    scheduling strategy: TIMER_DRIVEN
+    scheduling period: 30 sec
+    Properties:
+      Input Port Name: provenance
+      Destination URL: http://localhost:8082/
+      Instance URL: http://${hostname(true)}:8081/nifi
+      Compress Events: true
+      Communications Timeout: 30 secs
+      Batch Size: 1000
+
+  - name: Why-Naming-Is-So-Hard
+    class: org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask
+    comment:
+    scheduling strategy: TIMER_DRIVEN
+    scheduling period: 30 sec
+    Properties:
+      Input Port Name: provenance
+      Destination URL: http://localhost:8083/
+      Instance URL: http://${hostname(true)}:8081/nifi
+      Compress Events: true
+      Communications Timeout: 30 secs
+      Batch Size: 1000
+
+  - name: Monitor-Memor-inator
+    id: e415284f-0170-1000-1174-52ac9c53c22b
+    class: org.apache.nifi.controller.MonitorMemory
+    scheduling strategy: TIMER_DRIVEN
+    scheduling period: 5 mins
+    Properties:
+      Usage Threshold: 1%
\ No newline at end of file
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
index ff9c9bf..d6b36ef 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
@@ -36,6 +36,7 @@ import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CO
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CORE_PROPS_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.FLOWFILE_REPO_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.FLOW_CONTROLLER_PROPS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.GENERAL_REPORTING_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NIFI_PROPERTIES_OVERRIDES_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPORTING_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPO_KEY;
@@ -54,6 +55,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
     public static final String FOUND_THE_FOLLOWING_DUPLICATE_CONTROLLER_SERVICE_IDS = "Found the following duplicate controller service ids: ";
     public static final String FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS = "Found the following duplicate connection ids: ";
     public static final String FOUND_THE_FOLLOWING_DUPLICATE_FUNNEL_IDS = "Found the following duplicate funnel ids: ";
+    public static final String FOUND_THE_FOLLOWING_DUPLICATE_REPORTING_IDS = "Found the following duplicate reporting ids: ";
     public static String TOP_LEVEL_NAME = "top level";
     private FlowControllerSchema flowControllerProperties;
     private CorePropertiesSchema coreProperties;
@@ -63,6 +65,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
     private SecurityPropertiesSchema securityProperties;
     private ProcessGroupSchema processGroupSchema;
     private ProvenanceReportingSchema provenanceReportingProperties;
+    private List<ReportingSchema> reportingTasks;
 
     private ProvenanceRepositorySchema provenanceRepositorySchema;
 
@@ -86,6 +89,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
         processGroupSchema = new ProcessGroupSchema(map, TOP_LEVEL_NAME);
 
         provenanceReportingProperties = getMapAsType(map, PROVENANCE_REPORTING_KEY, ProvenanceReportingSchema.class, TOP_LEVEL_NAME, false, false);
+        reportingTasks = getOptionalKeyAsList(map, GENERAL_REPORTING_KEY, ReportingSchema::new, TOP_LEVEL_NAME);
 
         nifiPropertiesOverrides = (Map<String, String>) map.get(NIFI_PROPERTIES_OVERRIDES_KEY);
         if (nifiPropertiesOverrides == null) {
@@ -100,6 +104,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
         addIssuesIfNotNull(securityProperties);
         addIssuesIfNotNull(processGroupSchema);
         addIssuesIfNotNull(provenanceReportingProperties);
+        addIssuesIfNotNull(reportingTasks);
         addIssuesIfNotNull(provenanceRepositorySchema);
 
         List<ProcessGroupSchema> allProcessGroups = getAllProcessGroups(processGroupSchema);
@@ -117,6 +122,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
                 .flatMap(r -> r.getOutputPorts().stream()).map(RemotePortSchema::getId).collect(Collectors.toList());
         List<String> allInputPortIds = allProcessGroups.stream().flatMap(p -> p.getInputPortSchemas().stream()).map(PortSchema::getId).collect(Collectors.toList());
         List<String> allOutputPortIds = allProcessGroups.stream().flatMap(p -> p.getOutputPortSchemas().stream()).map(PortSchema::getId).collect(Collectors.toList());
+        List<String> allReportingIds = reportingTasks.stream().map(ReportingSchema::getId).collect(Collectors.toList());
 
         checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_IDS, allProcessorIds);
         checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_CONTROLLER_SERVICE_IDS, allControllerServiceIds);
@@ -124,6 +130,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
         checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS, allConnectionIds);
         checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_INPUT_PORT_IDS, allInputPortIds);
         checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_OUTPUT_PORT_IDS, allOutputPortIds);
+        checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REPORTING_IDS, allReportingIds);
 
         // Potential connection sources and destinations need to have unique ids
         CollectionOverlap<String> overlapResults = new CollectionOverlap<>(new HashSet<>(allProcessorIds), new HashSet<>(allRemoteInputPortIds), new HashSet<>(allRemoteOutputPortIds),
@@ -167,6 +174,10 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
         putIfNotNull(result, SECURITY_PROPS_KEY, securityProperties);
         result.putAll(processGroupSchema.toMap());
         putIfNotNull(result, PROVENANCE_REPORTING_KEY, provenanceReportingProperties);
+        if (!reportingTasks.isEmpty()) {
+            // for backward compatibility
+            putListIfNotNull(result, GENERAL_REPORTING_KEY, reportingTasks);
+        }
         result.put(NIFI_PROPERTIES_OVERRIDES_KEY, nifiPropertiesOverrides);
         return result;
     }
@@ -207,6 +218,10 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
         return provenanceReportingProperties;
     }
 
+    public List<ReportingSchema> getReportingTasksSchema() {
+        return reportingTasks;
+    }
+
     public ComponentStatusRepositorySchema getComponentStatusRepositoryProperties() {
         return componentStatusRepositoryProperties;
     }
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceReportingSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceReportingSchema.java
index d197ccb..882081b 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceReportingSchema.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceReportingSchema.java
@@ -21,19 +21,20 @@ import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
 import org.apache.nifi.minifi.commons.schema.common.WritableSchema;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 
+import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.nifi.minifi.commons.schema.RemoteProcessGroupSchema.TIMEOUT_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CLASS_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMMENT_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROPERTIES_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPORTING_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SCHEDULING_PERIOD_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SCHEDULING_STRATEGY_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.USE_COMPRESSION_KEY;
-import static org.apache.nifi.minifi.commons.schema.RemoteProcessGroupSchema.TIMEOUT_KEY;
 
-/**
- *
- */
 public class ProvenanceReportingSchema extends BaseSchema implements WritableSchema {
+    public static final String DEFAULT_PROV_REPORTING_TASK_CLASS = "org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask";
     public static final String DESTINATION_URL_KEY = "destination url";
     public static final String PORT_NAME_KEY = "port name";
     public static final String ORIGINATING_URL_KEY = "originating url";
@@ -54,6 +55,7 @@ public class ProvenanceReportingSchema extends BaseSchema implements WritableSch
     private Boolean useCompression = DEFAULT_USE_COMPRESSION;
     private String timeout = DEFAULT_TIMEOUT;
     private Number batchSize = DEFAULT_BATCH_SIZE;
+    private String SSL;
 
     public ProvenanceReportingSchema(Map map) {
         schedulingStrategy = getRequiredKeyAsType(map, SCHEDULING_STRATEGY_KEY, String.class, PROVENANCE_REPORTING_KEY);
@@ -64,18 +66,38 @@ public class ProvenanceReportingSchema extends BaseSchema implements WritableSch
                 addValidationIssue(SCHEDULING_STRATEGY_KEY, PROVENANCE_REPORTING_KEY, "it is not a valid scheduling strategy");
             }
         }
-
         schedulingPeriod = getRequiredKeyAsType(map, SCHEDULING_PERIOD_KEY, String.class, PROVENANCE_REPORTING_KEY);
-        destinationUrl = getRequiredKeyAsType(map, DESTINATION_URL_KEY, String.class, PROVENANCE_REPORTING_KEY);
-        portName = getRequiredKeyAsType(map, PORT_NAME_KEY, String.class, PROVENANCE_REPORTING_KEY);
-
         comment = getOptionalKeyAsType(map, COMMENT_KEY, String.class, PROVENANCE_REPORTING_KEY, "");
+
         originatingUrl = getOptionalKeyAsType(map, ORIGINATING_URL_KEY, String.class, PROVENANCE_REPORTING_KEY, DEFAULT_ORGINATING_URL);
+        destinationUrl = getRequiredKeyAsType(map, DESTINATION_URL_KEY, String.class, PROVENANCE_REPORTING_KEY);
+        portName = getRequiredKeyAsType(map, PORT_NAME_KEY, String.class, PROVENANCE_REPORTING_KEY);
         useCompression = getOptionalKeyAsType(map, USE_COMPRESSION_KEY, Boolean.class, PROVENANCE_REPORTING_KEY, DEFAULT_USE_COMPRESSION);
         timeout = getOptionalKeyAsType(map, TIMEOUT_KEY, String.class, PROVENANCE_REPORTING_KEY, DEFAULT_TIMEOUT);
         batchSize = getOptionalKeyAsType(map, BATCH_SIZE_KEY, Number.class, PROVENANCE_REPORTING_KEY, DEFAULT_BATCH_SIZE);
     }
 
+    public ReportingSchema convert() {
+        Map<String, Object> properties = new HashMap<>();
+        properties.put("Destination URL", destinationUrl);
+        properties.put("Input Port Name", portName);
+        properties.put("Instance URL", originatingUrl);
+        properties.put("Compress Events", useCompression);
+        properties.put("Batch Size", batchSize);
+        properties.put("Communications Timeout", timeout);
+        properties.put("SSL Context Service", SSL);
+
+        Map<String, Object> target = super.mapSupplier.get();
+        target.put(CLASS_KEY, DEFAULT_PROV_REPORTING_TASK_CLASS);
+        target.put(COMMENT_KEY, comment);
+        target.put(SCHEDULING_STRATEGY_KEY, schedulingStrategy);
+        target.put(SCHEDULING_PERIOD_KEY, schedulingPeriod);
+        target.put(PROPERTIES_KEY, properties);
+
+        ReportingSchema provenance = new ReportingSchema(target);
+        return provenance;
+    }
+
     @Override
     public Map<String, Object> toMap() {
         Map<String, Object> result = mapSupplier.get();
@@ -91,6 +113,10 @@ public class ProvenanceReportingSchema extends BaseSchema implements WritableSch
         return result;
     }
 
+    public void setSSL(boolean useSSL) {
+        SSL = useSSL ? "SSL-Context-Service" : "";
+    }
+
     public String getComment() {
         return comment;
     }
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ReportingSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ReportingSchema.java
new file mode 100644
index 0000000..a833b57
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ReportingSchema.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.schema;
+
+import org.apache.nifi.minifi.commons.schema.common.BaseSchemaWithIdAndName;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CLASS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMMENT_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.DEFAULT_PROPERTIES;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROPERTIES_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SCHEDULING_PERIOD_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SCHEDULING_STRATEGY_KEY;
+import static org.apache.nifi.minifi.commons.schema.ProcessorSchema.IT_IS_NOT_A_VALID_SCHEDULING_STRATEGY;
+
+public class ReportingSchema extends BaseSchemaWithIdAndName {
+    private String schedulingStrategy;
+    private String schedulingPeriod;
+    private String comment;
+
+    private String reportingClass;
+    private Map<String, Object> properties = DEFAULT_PROPERTIES;
+
+    public ReportingSchema(Map map) {
+        super(map, "Reporting(id: {id}, name: {name})");
+        if (this.getId().equals("")) {
+            // MiNiFi will throw an error if it can not find `id` of BaseSchemaWithIdAndName for YML config version 3
+            this.setId(UUID.randomUUID().toString());
+        }
+        String wrapperName = getWrapperName();
+        reportingClass = getRequiredKeyAsType(map, CLASS_KEY, String.class, wrapperName);
+        schedulingStrategy = getRequiredKeyAsType(map, SCHEDULING_STRATEGY_KEY, String.class, wrapperName);
+        if (schedulingStrategy != null && !isSchedulingStrategy(schedulingStrategy)) {
+            addValidationIssue(SCHEDULING_STRATEGY_KEY, wrapperName, IT_IS_NOT_A_VALID_SCHEDULING_STRATEGY);
+        }
+        schedulingPeriod = getRequiredKeyAsType(map, SCHEDULING_PERIOD_KEY, String.class, wrapperName);
+        comment = getOptionalKeyAsType(map, COMMENT_KEY, String.class, wrapperName, "");
+        properties = getOptionalKeyAsType(map, PROPERTIES_KEY, Map.class, wrapperName, DEFAULT_PROPERTIES);
+    }
+
+    public static boolean isSchedulingStrategy(String string) {
+        try {
+            SchedulingStrategy.valueOf(string);
+        } catch (Exception e) {
+            return false;
+        }
+        return true;
+    }
+
+    public Map<String, Object> toMap() {
+        Map<String, Object> result = super.toMap();
+        result.put(CLASS_KEY, reportingClass);
+        result.put(COMMENT_KEY, comment);
+        result.put(SCHEDULING_STRATEGY_KEY, schedulingStrategy);
+        result.put(SCHEDULING_PERIOD_KEY, schedulingPeriod);
+        result.put(PROPERTIES_KEY, new HashMap<>(properties));
+        return result;
+    }
+
+    public String getComment() {
+        return comment;
+    }
+
+    public String getSchedulingStrategy() {
+        return schedulingStrategy;
+    }
+
+    public String getSchedulingPeriod() {
+        return schedulingPeriod;
+    }
+
+    public String getReportingClass() {
+        return reportingClass;
+    }
+
+    public Map<String, Object> getProperties() {
+        return properties;
+    }
+}
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java
index c52f7fe..99d0086 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java
@@ -32,6 +32,7 @@ public class CommonPropertyKeys {
     public static final String PROCESSORS_KEY = "Processors";
     public static final String CONNECTIONS_KEY = "Connections";
     public static final String PROVENANCE_REPORTING_KEY = "Provenance Reporting";
+    public static final String GENERAL_REPORTING_KEY = "Reporting Tasks";
     public static final String REMOTE_PROCESS_GROUPS_KEY = "Remote Process Groups";
     public static final String INPUT_PORTS_KEY = "Input Ports";
     public static final String OUTPUT_PORTS_KEY = "Output Ports";