You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2020/06/12 20:51:09 UTC

[nifi-minifi] branch master updated (2e0970b -> 5a95f60)

This is an automated email from the ASF dual-hosted git repository.

aldrin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi.git.


    from 2e0970b  MINIFI-521 Change logic around how updated RPG logic is applied to handle all subsequent versions
     new e44da56  MINIFI-492 - Generic reporting tasks
     new 5a95f60  MINIFI-521 Adding missing license headers for test resources.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../minifi/bootstrap/util/ConfigTransformer.java   |  68 +++----
 .../bootstrap/util/ConfigTransformerTest.java      |  26 +++
 .../src/test/resources/config-reporting-task.yml   | 218 +++++++++++++++++++++
 .../nifi/minifi/commons/schema/ConfigSchema.java   |  15 ++
 .../commons/schema/ProvenanceReportingSchema.java  |  42 +++-
 .../minifi/commons/schema/ReportingSchema.java     |  98 +++++++++
 .../commons/schema/common/CommonPropertyKeys.java  |   1 +
 .../resources/MINIFI-521_1.3_TemplateEncoding.xml  |  14 ++
 .../resources/MINIFI-521_1.3_TemplateEncoding.yml  |  14 ++
 9 files changed, 454 insertions(+), 42 deletions(-)
 create mode 100644 minifi-bootstrap/src/test/resources/config-reporting-task.yml
 create mode 100644 minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ReportingSchema.java


[nifi-minifi] 01/02: MINIFI-492 - Generic reporting tasks

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aldrin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi.git

commit e44da568d8c099cda7b7fa4db6722d8b66aa9562
Author: Nghia Le <mi...@gmail.com>
AuthorDate: Mon May 25 22:46:14 2020 +0200

    MINIFI-492 - Generic reporting tasks
    
    This closes #187.
---
 .../minifi/bootstrap/util/ConfigTransformer.java   |  68 +++----
 .../bootstrap/util/ConfigTransformerTest.java      |  26 +++
 .../src/test/resources/config-reporting-task.yml   | 218 +++++++++++++++++++++
 .../nifi/minifi/commons/schema/ConfigSchema.java   |  15 ++
 .../commons/schema/ProvenanceReportingSchema.java  |  42 +++-
 .../minifi/commons/schema/ReportingSchema.java     |  98 +++++++++
 .../commons/schema/common/CommonPropertyKeys.java  |   1 +
 7 files changed, 426 insertions(+), 42 deletions(-)

diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
index da4af51..677080c 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
@@ -38,6 +38,7 @@ import org.apache.nifi.minifi.commons.schema.ProvenanceReportingSchema;
 import org.apache.nifi.minifi.commons.schema.ProvenanceRepositorySchema;
 import org.apache.nifi.minifi.commons.schema.RemotePortSchema;
 import org.apache.nifi.minifi.commons.schema.RemoteProcessGroupSchema;
+import org.apache.nifi.minifi.commons.schema.ReportingSchema;
 import org.apache.nifi.minifi.commons.schema.SecurityPropertiesSchema;
 import org.apache.nifi.minifi.commons.schema.SensitivePropsSchema;
 import org.apache.nifi.minifi.commons.schema.SwapSchema;
@@ -81,7 +82,6 @@ public final class ConfigTransformer {
     // Underlying version of NIFI will be using
     public static final String NIFI_VERSION = "1.8.0";
     public static final String ROOT_GROUP = "Root-Group";
-    public static final String DEFAULT_PROV_REPORTING_TASK_CLASS = "org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask";
     public static final String NIFI_VERSION_KEY = "nifi.version";
 
     public static final Logger logger = LoggerFactory.getLogger(ConfigTransformer.class);
@@ -178,7 +178,7 @@ public final class ConfigTransformer {
             ProvenanceRepositorySchema provenanceRepositorySchema = configSchema.getProvenanceRepositorySchema();
 
             OrderedProperties orderedProperties = new OrderedProperties();
-            orderedProperties.setProperty(NIFI_VERSION_KEY, NIFI_VERSION,"# Core Properties #" + System.lineSeparator());
+            orderedProperties.setProperty(NIFI_VERSION_KEY, NIFI_VERSION, "# Core Properties #" + System.lineSeparator());
             orderedProperties.setProperty("nifi.flow.configuration.file", "./conf/flow.xml.gz");
             orderedProperties.setProperty("nifi.flow.configuration.archive.enabled", "false");
             orderedProperties.setProperty("nifi.flow.configuration.archive.dir", "./conf/archive/");
@@ -288,7 +288,7 @@ public final class ConfigTransformer {
         }
     }
 
-    protected static DOMSource createFlowXml(ConfigSchema configSchema) throws IOException, ConfigurationChangeException, ConfigTransformerException{
+    protected static DOMSource createFlowXml(ConfigSchema configSchema) throws IOException, ConfigurationChangeException, ConfigTransformerException {
         try {
             // create a new, empty document
             final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
@@ -317,9 +317,10 @@ public final class ConfigTransformer {
             addProcessGroup(doc, element, processGroupSchema, new ParentGroupIdResolver(processGroupSchema));
 
             SecurityPropertiesSchema securityProperties = configSchema.getSecurityProperties();
-            if (securityProperties.useSSL()) {
+            boolean useSSL = securityProperties.useSSL();
+            if (useSSL) {
                 Element controllerServicesNode = doc.getElementById("controllerServices");
-                if(controllerServicesNode == null) {
+                if (controllerServicesNode == null) {
                     controllerServicesNode = doc.createElement("controllerServices");
                 }
 
@@ -327,17 +328,27 @@ public final class ConfigTransformer {
                 addSSLControllerService(controllerServicesNode, securityProperties);
             }
 
+            List<ReportingSchema> reportingTasks = configSchema.getReportingTasksSchema();
             ProvenanceReportingSchema provenanceProperties = configSchema.getProvenanceReportingProperties();
             if (provenanceProperties != null) {
+                provenanceProperties.setSSL(useSSL);
+                ReportingSchema provenance = provenanceProperties.convert();
+                provenance.setId("Provenance-Reporting");
+                provenance.setName("Site-To-Site-Provenance-Reporting");
+                reportingTasks.add(provenance);
+            }
+            if (reportingTasks != null) {
                 final Element reportingTasksNode = doc.createElement("reportingTasks");
                 rootNode.appendChild(reportingTasksNode);
-                addProvenanceReportingTask(reportingTasksNode, configSchema);
+                for (ReportingSchema task : reportingTasks) {
+                    addReportingTask(reportingTasksNode, task);
+                }
             }
 
             return new DOMSource(doc);
         } catch (final ParserConfigurationException | DOMException | TransformerFactoryConfigurationError | IllegalArgumentException e) {
             throw new ConfigTransformerException(e);
-        } catch (Exception e){
+        } catch (Exception e) {
             throw new ConfigTransformerException("Failed to parse the config YAML while writing the top level of the flow xml", e);
         }
     }
@@ -384,7 +395,7 @@ public final class ConfigTransformer {
             addConfiguration(serviceElement, attributes);
 
             String annotationData = controllerServiceSchema.getAnnotationData();
-            if(annotationData != null && !annotationData.isEmpty()) {
+            if (annotationData != null && !annotationData.isEmpty()) {
                 addTextElement(element, "annotationData", annotationData);
             }
 
@@ -480,7 +491,7 @@ public final class ConfigTransformer {
             addTextElement(element, "runDurationNanos", String.valueOf(processorConfig.getRunDurationNanos()));
 
             String annotationData = processorConfig.getAnnotationData();
-            if(annotationData != null && !annotationData.isEmpty()) {
+            if (annotationData != null && !annotationData.isEmpty()) {
                 addTextElement(element, "annotationData", annotationData);
             }
 
@@ -508,34 +519,23 @@ public final class ConfigTransformer {
         addPosition(element);
     }
 
-    protected static void addProvenanceReportingTask(final Element element, ConfigSchema configSchema) throws ConfigurationChangeException {
+    protected static void addReportingTask(final Element parentElement, ReportingSchema reportingSchema) throws ConfigurationChangeException {
         try {
-            ProvenanceReportingSchema provenanceProperties = configSchema.getProvenanceReportingProperties();
-            final Element taskElement = element.getOwnerDocument().createElement("reportingTask");
-            addTextElement(taskElement, "id", "Provenance-Reporting");
-            addTextElement(taskElement, "name", "Site-To-Site-Provenance-Reporting");
-            addTextElement(taskElement, "comment", provenanceProperties.getComment());
-            addTextElement(taskElement, "class", DEFAULT_PROV_REPORTING_TASK_CLASS);
-            addTextElement(taskElement, "schedulingPeriod", provenanceProperties.getSchedulingPeriod());
-            addTextElement(taskElement, "scheduledState", "RUNNING");
-            addTextElement(taskElement, "schedulingStrategy", provenanceProperties.getSchedulingStrategy());
+            final Document doc = parentElement.getOwnerDocument();
+            final Element element = doc.createElement("reportingTask");
+            parentElement.appendChild(element);
 
-            Map<String, Object> attributes = new HashMap<>();
-            attributes.put("Destination URL", provenanceProperties.getDestinationUrl());
-            attributes.put("Input Port Name", provenanceProperties.getPortName());
-            attributes.put("Instance URL", provenanceProperties.getOriginatingUrl());
-            attributes.put("Compress Events", provenanceProperties.getUseCompression());
-            attributes.put("Batch Size", provenanceProperties.getBatchSize());
-            attributes.put("Communications Timeout", provenanceProperties.getTimeout());
-
-            SecurityPropertiesSchema securityProps = configSchema.getSecurityProperties();
-            if (securityProps.useSSL()) {
-                attributes.put("SSL Context Service", "SSL-Context-Service");
-            }
+            addTextElement(element, "id", reportingSchema.getId());
+            addTextElement(element, "name", reportingSchema.getName());
+            addTextElement(element, "comment", reportingSchema.getComment());
+            addTextElement(element, "class", reportingSchema.getReportingClass());
+            addTextElement(element, "schedulingPeriod", reportingSchema.getSchedulingPeriod());
+            addTextElement(element, "scheduledState", "RUNNING");
+            addTextElement(element, "schedulingStrategy", reportingSchema.getSchedulingStrategy());
 
-            addConfiguration(taskElement, attributes);
+            addConfiguration(element, reportingSchema.getProperties());
 
-            element.appendChild(taskElement);
+            parentElement.appendChild(element);
         } catch (Exception e) {
             throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the Provenance Reporting Task", e);
         }
@@ -725,7 +725,7 @@ public final class ConfigTransformer {
             "# distributed under the License is distributed on an \"AS IS\" BASIS,\n" +
             "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" +
             "# See the License for the specific language governing permissions and\n" +
-            "# limitations under the License.\n"+
+            "# limitations under the License.\n" +
             "\n";
 
 }
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
index 32a5852..88bc318 100644
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
@@ -29,6 +29,7 @@ import org.apache.nifi.minifi.commons.schema.ProcessGroupSchema;
 import org.apache.nifi.minifi.commons.schema.ProcessorSchema;
 import org.apache.nifi.minifi.commons.schema.RemotePortSchema;
 import org.apache.nifi.minifi.commons.schema.RemoteProcessGroupSchema;
+import org.apache.nifi.minifi.commons.schema.ReportingSchema;
 import org.apache.nifi.minifi.commons.schema.common.StringUtil;
 import org.apache.nifi.minifi.commons.schema.exception.SchemaLoaderException;
 import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader;
@@ -123,6 +124,11 @@ public class ConfigTransformerTest {
     }
 
     @Test
+    public void testReportingTasksTransform() throws Exception {
+        testConfigFileTransform("config-reporting-task.yml");
+    }
+
+    @Test
     public void testProcessGroupsTransform() throws Exception {
         testConfigFileTransform("config-process-groups.yml");
     }
@@ -507,6 +513,7 @@ public class ConfigTransformerTest {
         Document document = documentBuilder.parse(new ByteArrayInputStream(outputStream.toByteArray()));
 
         testProcessGroup((Element) xPathFactory.newXPath().evaluate("flowController/rootGroup", document, XPathConstants.NODE), configSchema.getProcessGroupSchema());
+        testReportingTasks((Element) xPathFactory.newXPath().evaluate("flowController/reportingTasks", document, XPathConstants.NODE), configSchema.getReportingTasksSchema());
     }
 
     private void testProcessGroup(Element element, ProcessGroupSchema processGroupSchema) throws XPathExpressionException {
@@ -579,6 +586,25 @@ public class ConfigTransformerTest {
         testProperties(element, processorSchema.getProperties());
     }
 
+    private void testReportingTasks(Element element, List<ReportingSchema> reportingSchemas) throws XPathExpressionException {
+        NodeList taskElements = (NodeList) xPathFactory.newXPath().evaluate("reportingTask", element, XPathConstants.NODESET);
+        assertEquals(reportingSchemas.size(), taskElements.getLength());
+        for (int i = 0; i < taskElements.getLength(); i++) {
+            testReportingTask((Element) taskElements.item(i), reportingSchemas.get(i));
+        }
+    }
+
+    private void testReportingTask(Element element, ReportingSchema reportingSchema) throws XPathExpressionException {
+        assertEquals(reportingSchema.getId(), getText(element, "id"));
+        assertEquals(reportingSchema.getName(), getText(element, "name"));
+        assertEquals(reportingSchema.getComment(), getText(element, "comment"));
+        assertEquals(reportingSchema.getReportingClass(), getText(element, "class"));
+        assertEquals(reportingSchema.getSchedulingPeriod(), getText(element, "schedulingPeriod"));
+        assertEquals(reportingSchema.getSchedulingStrategy(), getText(element, "schedulingStrategy"));
+
+        testProperties(element, reportingSchema.getProperties());
+    }
+
     private void testControllerService(Element element, ControllerServiceSchema controllerServiceSchema) throws XPathExpressionException {
         assertEquals(controllerServiceSchema.getId(), getText(element, "id"));
         assertEquals(controllerServiceSchema.getName(), getText(element, "name"));
diff --git a/minifi-bootstrap/src/test/resources/config-reporting-task.yml b/minifi-bootstrap/src/test/resources/config-reporting-task.yml
new file mode 100644
index 0000000..b746316
--- /dev/null
+++ b/minifi-bootstrap/src/test/resources/config-reporting-task.yml
@@ -0,0 +1,218 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+MiNiFi Config Version: 3
+
+Flow Controller:
+  name: MiNiFi Flow
+  comment:
+
+Core Properties:
+  flow controller graceful shutdown period: 10 sec
+  flow service write delay interval: 500 ms
+  administrative yield duration: 30 sec
+  bored yield duration: 10 millis
+  max concurrent threads: 1
+
+FlowFile Repository:
+  partitions: 256
+  checkpoint interval: 2 mins
+  always sync: false
+  Swap:
+    threshold: 20000
+    in period: 5 sec
+    in threads: 1
+    out period: 5 sec
+    out threads: 4
+
+Content Repository:
+  content claim max appendable size: 10 MB
+  content claim max flow files: 100
+  always sync: false
+
+Component Status Repository:
+  buffer size: 1440
+  snapshot frequency: 1 min
+
+Security Properties:
+  keystore: ''
+  keystore type: ''
+  keystore password: ''
+  key password: ''
+  truststore: ''
+  truststore type: ''
+  truststore password: ''
+  ssl protocol: ''
+  Sensitive Props:
+    key: ''
+    algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL
+    provider: BC
+
+Processors:
+  - name: TailAppLog
+    id: c05f74a3-9f8f-3bb7-bced-f4735f962ec8
+    class: org.apache.nifi.processors.standard.TailFile
+    max concurrent tasks: 1
+    scheduling strategy: TIMER_DRIVEN
+    scheduling period: 10 sec
+    penalization period: 30 sec
+    yield period: 1 sec
+    run duration nanos: 0
+    auto-terminated relationships list:
+    Properties:
+      File to Tail: logs/minifi-app.log
+      Rolling Filename Pattern: minifi-app*
+      Initial Start Position: Beginning of File
+  - name: SplitIntoSingleLines
+    id: afbd6a07-64a0-39b3-953e-65b4678c5177
+    class: org.apache.nifi.processors.standard.SplitText
+    max concurrent tasks: 1
+    scheduling strategy: TIMER_DRIVEN
+    scheduling period: 0 sec
+    penalization period: 30 sec
+    yield period: 1 sec
+    run duration nanos: 0
+    auto-terminated relationships list:
+      - failure
+      - original
+    Properties:
+      Line Split Count: 1
+      Header Line Count: 0
+      Remove Trailing Newlines: true
+  - name: RouteErrors
+    id: c48394b5-39a5-3bd9-bf01-0bff0e0ad28d
+    class: org.apache.nifi.processors.standard.RouteText
+    max concurrent tasks: 1
+    scheduling strategy: TIMER_DRIVEN
+    scheduling period: 0 sec
+    penalization period: 30 sec
+    yield period: 1 sec
+    run duration nanos: 0
+    auto-terminated relationships list:
+      - unmatched
+      - original
+    Properties:
+      Routing Strategy: Route to 'matched' if line matches all conditions
+      Matching Strategy: Contains
+      Character Set: UTF-8
+      Ignore Leading/Trailing Whitespace: true
+      Ignore Case: true
+      Grouping Regular Expression:
+      WALFFR: WriteAheadFlowFileRepository
+  - name: PutFile
+    id: 1d80a4d8-86ae-3994-a988-2d7a46949091
+    class: org.apache.nifi.processors.standard.PutFile
+    max concurrent tasks: 1
+    scheduling strategy: TIMER_DRIVEN
+    scheduling period: 0 sec
+    penalization period: 30 sec
+    yield period: 1 sec
+    run duration nanos: 0
+    auto-terminated relationships list:
+      - failure
+      - success
+    Properties:
+      Directory: ./
+      Conflict Resolution Strategy: replace
+      Create Missing Directories: true
+      Maximum File Count:
+      Last Modified Time:
+      Permissions:
+      Owner:
+      Group:
+
+Connections:
+  - name: TailToSplit
+    id: 0401b747-1dca-31c7-ab4b-cdacf7e6c44b
+    source name: TailAppLog
+    source id: c05f74a3-9f8f-3bb7-bced-f4735f962ec8
+    source relationship names:
+      - success
+    destination name: SplitIntoSingleLines
+    destination id: afbd6a07-64a0-39b3-953e-65b4678c5177
+    max work queue size: 0
+    max work queue data size: 1 MB
+    flowfile expiration: 60 sec
+    queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer
+  - name: SplitToRoute
+    id: 093f9479-e2f0-3eb2-8184-151f42f22f34
+    source name: SplitIntoSingleLines
+    source id: afbd6a07-64a0-39b3-953e-65b4678c5177
+    source relationship names:
+      - splits
+    destination name: RouteErrors
+    destination id: c48394b5-39a5-3bd9-bf01-0bff0e0ad28d
+    max work queue size: 0
+    max work queue data size: 1 MB
+    flowfile expiration: 60 sec
+    queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer
+  - name: RouteToS2S
+    id: aacad63c-7e4c-31c9-888b-111e07c6fd22
+    source name: RouteErrors
+    source id: c48394b5-39a5-3bd9-bf01-0bff0e0ad28d
+    source relationship names:
+      - matched
+    destination name: PutFile
+    destination id: 1d80a4d8-86ae-3994-a988-2d7a46949091
+    max work queue size: 0
+    max work queue data size: 1 MB
+    flowfile expiration: 60 sec
+    queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer
+
+Provenance Reporting:
+  comment: old school config style
+  scheduling strategy: TIMER_DRIVEN
+  scheduling period: 30 sec
+  port name: provenance
+  destination url: http://localhost:8080/
+  originating url: http://${hostname(true)}:8081/nifi
+  use compression: true
+  timeout: 30 secs
+  batch size: 1000
+
+Reporting Tasks:
+  - name: Viet-Nam
+    class: org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask
+    comment: this sentence has three a’s, two c’s, two d’s, twenty-eight e’s, four f’s, four g’s, ten h’s, eight i’s, two l’s, eleven n’s, six o’s, seven r’s, twenty-seven s’s, eighteen t’s, three u’s, five v’s, six w’s, three x’s, and three y’s.
+    scheduling strategy: TIMER_DRIVEN
+    scheduling period: 30 sec
+    Properties:
+      Input Port Name: provenance
+      Destination URL: http://localhost:8082/
+      Instance URL: http://${hostname(true)}:8081/nifi
+      Compress Events: true
+      Communications Timeout: 30 secs
+      Batch Size: 1000
+
+  - name: Why-Naming-Is-So-Hard
+    class: org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask
+    comment:
+    scheduling strategy: TIMER_DRIVEN
+    scheduling period: 30 sec
+    Properties:
+      Input Port Name: provenance
+      Destination URL: http://localhost:8083/
+      Instance URL: http://${hostname(true)}:8081/nifi
+      Compress Events: true
+      Communications Timeout: 30 secs
+      Batch Size: 1000
+
+  - name: Monitor-Memor-inator
+    id: e415284f-0170-1000-1174-52ac9c53c22b
+    class: org.apache.nifi.controller.MonitorMemory
+    scheduling strategy: TIMER_DRIVEN
+    scheduling period: 5 mins
+    Properties:
+      Usage Threshold: 1%
\ No newline at end of file
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
index ff9c9bf..d6b36ef 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
@@ -36,6 +36,7 @@ import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CO
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CORE_PROPS_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.FLOWFILE_REPO_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.FLOW_CONTROLLER_PROPS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.GENERAL_REPORTING_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NIFI_PROPERTIES_OVERRIDES_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPORTING_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPO_KEY;
@@ -54,6 +55,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
     public static final String FOUND_THE_FOLLOWING_DUPLICATE_CONTROLLER_SERVICE_IDS = "Found the following duplicate controller service ids: ";
     public static final String FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS = "Found the following duplicate connection ids: ";
     public static final String FOUND_THE_FOLLOWING_DUPLICATE_FUNNEL_IDS = "Found the following duplicate funnel ids: ";
+    public static final String FOUND_THE_FOLLOWING_DUPLICATE_REPORTING_IDS = "Found the following duplicate reporting ids: ";
     public static String TOP_LEVEL_NAME = "top level";
     private FlowControllerSchema flowControllerProperties;
     private CorePropertiesSchema coreProperties;
@@ -63,6 +65,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
     private SecurityPropertiesSchema securityProperties;
     private ProcessGroupSchema processGroupSchema;
     private ProvenanceReportingSchema provenanceReportingProperties;
+    private List<ReportingSchema> reportingTasks;
 
     private ProvenanceRepositorySchema provenanceRepositorySchema;
 
@@ -86,6 +89,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
         processGroupSchema = new ProcessGroupSchema(map, TOP_LEVEL_NAME);
 
         provenanceReportingProperties = getMapAsType(map, PROVENANCE_REPORTING_KEY, ProvenanceReportingSchema.class, TOP_LEVEL_NAME, false, false);
+        reportingTasks = getOptionalKeyAsList(map, GENERAL_REPORTING_KEY, ReportingSchema::new, TOP_LEVEL_NAME);
 
         nifiPropertiesOverrides = (Map<String, String>) map.get(NIFI_PROPERTIES_OVERRIDES_KEY);
         if (nifiPropertiesOverrides == null) {
@@ -100,6 +104,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
         addIssuesIfNotNull(securityProperties);
         addIssuesIfNotNull(processGroupSchema);
         addIssuesIfNotNull(provenanceReportingProperties);
+        addIssuesIfNotNull(reportingTasks);
         addIssuesIfNotNull(provenanceRepositorySchema);
 
         List<ProcessGroupSchema> allProcessGroups = getAllProcessGroups(processGroupSchema);
@@ -117,6 +122,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
                 .flatMap(r -> r.getOutputPorts().stream()).map(RemotePortSchema::getId).collect(Collectors.toList());
         List<String> allInputPortIds = allProcessGroups.stream().flatMap(p -> p.getInputPortSchemas().stream()).map(PortSchema::getId).collect(Collectors.toList());
         List<String> allOutputPortIds = allProcessGroups.stream().flatMap(p -> p.getOutputPortSchemas().stream()).map(PortSchema::getId).collect(Collectors.toList());
+        List<String> allReportingIds = reportingTasks.stream().map(ReportingSchema::getId).collect(Collectors.toList());
 
         checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_IDS, allProcessorIds);
         checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_CONTROLLER_SERVICE_IDS, allControllerServiceIds);
@@ -124,6 +130,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
         checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS, allConnectionIds);
         checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_INPUT_PORT_IDS, allInputPortIds);
         checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_OUTPUT_PORT_IDS, allOutputPortIds);
+        checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REPORTING_IDS, allReportingIds);
 
         // Potential connection sources and destinations need to have unique ids
         CollectionOverlap<String> overlapResults = new CollectionOverlap<>(new HashSet<>(allProcessorIds), new HashSet<>(allRemoteInputPortIds), new HashSet<>(allRemoteOutputPortIds),
@@ -167,6 +174,10 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
         putIfNotNull(result, SECURITY_PROPS_KEY, securityProperties);
         result.putAll(processGroupSchema.toMap());
         putIfNotNull(result, PROVENANCE_REPORTING_KEY, provenanceReportingProperties);
+        if (!reportingTasks.isEmpty()) {
+            // for backward compatibility
+            putListIfNotNull(result, GENERAL_REPORTING_KEY, reportingTasks);
+        }
         result.put(NIFI_PROPERTIES_OVERRIDES_KEY, nifiPropertiesOverrides);
         return result;
     }
@@ -207,6 +218,10 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
         return provenanceReportingProperties;
     }
 
+    public List<ReportingSchema> getReportingTasksSchema() {
+        return reportingTasks;
+    }
+
     public ComponentStatusRepositorySchema getComponentStatusRepositoryProperties() {
         return componentStatusRepositoryProperties;
     }
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceReportingSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceReportingSchema.java
index d197ccb..882081b 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceReportingSchema.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceReportingSchema.java
@@ -21,19 +21,20 @@ import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
 import org.apache.nifi.minifi.commons.schema.common.WritableSchema;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 
+import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.nifi.minifi.commons.schema.RemoteProcessGroupSchema.TIMEOUT_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CLASS_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMMENT_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROPERTIES_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPORTING_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SCHEDULING_PERIOD_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SCHEDULING_STRATEGY_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.USE_COMPRESSION_KEY;
-import static org.apache.nifi.minifi.commons.schema.RemoteProcessGroupSchema.TIMEOUT_KEY;
 
-/**
- *
- */
 public class ProvenanceReportingSchema extends BaseSchema implements WritableSchema {
+    public static final String DEFAULT_PROV_REPORTING_TASK_CLASS = "org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask";
     public static final String DESTINATION_URL_KEY = "destination url";
     public static final String PORT_NAME_KEY = "port name";
     public static final String ORIGINATING_URL_KEY = "originating url";
@@ -54,6 +55,7 @@ public class ProvenanceReportingSchema extends BaseSchema implements WritableSch
     private Boolean useCompression = DEFAULT_USE_COMPRESSION;
     private String timeout = DEFAULT_TIMEOUT;
     private Number batchSize = DEFAULT_BATCH_SIZE;
+    private String SSL;
 
     public ProvenanceReportingSchema(Map map) {
         schedulingStrategy = getRequiredKeyAsType(map, SCHEDULING_STRATEGY_KEY, String.class, PROVENANCE_REPORTING_KEY);
@@ -64,18 +66,38 @@ public class ProvenanceReportingSchema extends BaseSchema implements WritableSch
                 addValidationIssue(SCHEDULING_STRATEGY_KEY, PROVENANCE_REPORTING_KEY, "it is not a valid scheduling strategy");
             }
         }
-
         schedulingPeriod = getRequiredKeyAsType(map, SCHEDULING_PERIOD_KEY, String.class, PROVENANCE_REPORTING_KEY);
-        destinationUrl = getRequiredKeyAsType(map, DESTINATION_URL_KEY, String.class, PROVENANCE_REPORTING_KEY);
-        portName = getRequiredKeyAsType(map, PORT_NAME_KEY, String.class, PROVENANCE_REPORTING_KEY);
-
         comment = getOptionalKeyAsType(map, COMMENT_KEY, String.class, PROVENANCE_REPORTING_KEY, "");
+
         originatingUrl = getOptionalKeyAsType(map, ORIGINATING_URL_KEY, String.class, PROVENANCE_REPORTING_KEY, DEFAULT_ORGINATING_URL);
+        destinationUrl = getRequiredKeyAsType(map, DESTINATION_URL_KEY, String.class, PROVENANCE_REPORTING_KEY);
+        portName = getRequiredKeyAsType(map, PORT_NAME_KEY, String.class, PROVENANCE_REPORTING_KEY);
         useCompression = getOptionalKeyAsType(map, USE_COMPRESSION_KEY, Boolean.class, PROVENANCE_REPORTING_KEY, DEFAULT_USE_COMPRESSION);
         timeout = getOptionalKeyAsType(map, TIMEOUT_KEY, String.class, PROVENANCE_REPORTING_KEY, DEFAULT_TIMEOUT);
         batchSize = getOptionalKeyAsType(map, BATCH_SIZE_KEY, Number.class, PROVENANCE_REPORTING_KEY, DEFAULT_BATCH_SIZE);
     }
 
+    public ReportingSchema convert() {
+        Map<String, Object> properties = new HashMap<>();
+        properties.put("Destination URL", destinationUrl);
+        properties.put("Input Port Name", portName);
+        properties.put("Instance URL", originatingUrl);
+        properties.put("Compress Events", useCompression);
+        properties.put("Batch Size", batchSize);
+        properties.put("Communications Timeout", timeout);
+        properties.put("SSL Context Service", SSL);
+
+        Map<String, Object> target = super.mapSupplier.get();
+        target.put(CLASS_KEY, DEFAULT_PROV_REPORTING_TASK_CLASS);
+        target.put(COMMENT_KEY, comment);
+        target.put(SCHEDULING_STRATEGY_KEY, schedulingStrategy);
+        target.put(SCHEDULING_PERIOD_KEY, schedulingPeriod);
+        target.put(PROPERTIES_KEY, properties);
+
+        ReportingSchema provenance = new ReportingSchema(target);
+        return provenance;
+    }
+
     @Override
     public Map<String, Object> toMap() {
         Map<String, Object> result = mapSupplier.get();
@@ -91,6 +113,10 @@ public class ProvenanceReportingSchema extends BaseSchema implements WritableSch
         return result;
     }
 
+    public void setSSL(boolean useSSL) {
+        SSL = useSSL ? "SSL-Context-Service" : "";
+    }
+
     public String getComment() {
         return comment;
     }
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ReportingSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ReportingSchema.java
new file mode 100644
index 0000000..a833b57
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ReportingSchema.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.schema;
+
+import org.apache.nifi.minifi.commons.schema.common.BaseSchemaWithIdAndName;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CLASS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMMENT_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.DEFAULT_PROPERTIES;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROPERTIES_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SCHEDULING_PERIOD_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SCHEDULING_STRATEGY_KEY;
+import static org.apache.nifi.minifi.commons.schema.ProcessorSchema.IT_IS_NOT_A_VALID_SCHEDULING_STRATEGY;
+
+public class ReportingSchema extends BaseSchemaWithIdAndName {
+    private String schedulingStrategy;
+    private String schedulingPeriod;
+    private String comment;
+
+    private String reportingClass;
+    private Map<String, Object> properties = DEFAULT_PROPERTIES;
+
+    public ReportingSchema(Map map) {
+        super(map, "Reporting(id: {id}, name: {name})");
+        if (this.getId().equals("")) {
+            // MiNiFi will throw an error if it can not find `id` of BaseSchemaWithIdAndName for YML config version 3
+            this.setId(UUID.randomUUID().toString());
+        }
+        String wrapperName = getWrapperName();
+        reportingClass = getRequiredKeyAsType(map, CLASS_KEY, String.class, wrapperName);
+        schedulingStrategy = getRequiredKeyAsType(map, SCHEDULING_STRATEGY_KEY, String.class, wrapperName);
+        if (schedulingStrategy != null && !isSchedulingStrategy(schedulingStrategy)) {
+            addValidationIssue(SCHEDULING_STRATEGY_KEY, wrapperName, IT_IS_NOT_A_VALID_SCHEDULING_STRATEGY);
+        }
+        schedulingPeriod = getRequiredKeyAsType(map, SCHEDULING_PERIOD_KEY, String.class, wrapperName);
+        comment = getOptionalKeyAsType(map, COMMENT_KEY, String.class, wrapperName, "");
+        properties = getOptionalKeyAsType(map, PROPERTIES_KEY, Map.class, wrapperName, DEFAULT_PROPERTIES);
+    }
+
+    public static boolean isSchedulingStrategy(String string) {
+        try {
+            SchedulingStrategy.valueOf(string);
+        } catch (Exception e) {
+            return false;
+        }
+        return true;
+    }
+
+    public Map<String, Object> toMap() {
+        Map<String, Object> result = super.toMap();
+        result.put(CLASS_KEY, reportingClass);
+        result.put(COMMENT_KEY, comment);
+        result.put(SCHEDULING_STRATEGY_KEY, schedulingStrategy);
+        result.put(SCHEDULING_PERIOD_KEY, schedulingPeriod);
+        result.put(PROPERTIES_KEY, new HashMap<>(properties));
+        return result;
+    }
+
+    public String getComment() {
+        return comment;
+    }
+
+    public String getSchedulingStrategy() {
+        return schedulingStrategy;
+    }
+
+    public String getSchedulingPeriod() {
+        return schedulingPeriod;
+    }
+
+    public String getReportingClass() {
+        return reportingClass;
+    }
+
+    public Map<String, Object> getProperties() {
+        return properties;
+    }
+}
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java
index c52f7fe..99d0086 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java
@@ -32,6 +32,7 @@ public class CommonPropertyKeys {
     public static final String PROCESSORS_KEY = "Processors";
     public static final String CONNECTIONS_KEY = "Connections";
     public static final String PROVENANCE_REPORTING_KEY = "Provenance Reporting";
+    public static final String GENERAL_REPORTING_KEY = "Reporting Tasks";
     public static final String REMOTE_PROCESS_GROUPS_KEY = "Remote Process Groups";
     public static final String INPUT_PORTS_KEY = "Input Ports";
     public static final String OUTPUT_PORTS_KEY = "Output Ports";


[nifi-minifi] 02/02: MINIFI-521 Adding missing license headers for test resources.

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aldrin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi.git

commit 5a95f60088c0d868b2db6ef186a7c31cfa1b536d
Author: Aldrin Piri <al...@apache.org>
AuthorDate: Fri Jun 12 16:45:06 2020 -0400

    MINIFI-521 Adding missing license headers for test resources.
    
    Signed-off-by: Aldrin Piri <al...@apache.org>
---
 .../src/test/resources/MINIFI-521_1.3_TemplateEncoding.xml | 14 ++++++++++++++
 .../src/test/resources/MINIFI-521_1.3_TemplateEncoding.yml | 14 ++++++++++++++
 2 files changed, 28 insertions(+)

diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/MINIFI-521_1.3_TemplateEncoding.xml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/MINIFI-521_1.3_TemplateEncoding.xml
index 1ae143b..38c41e8 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/MINIFI-521_1.3_TemplateEncoding.xml
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/MINIFI-521_1.3_TemplateEncoding.xml
@@ -1,4 +1,18 @@
 <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+   http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+ -->
 <template encoding-version="1.3">
     <description></description>
     <groupId>dacdd7b8-016e-1000-3bfe-7b025a1f4a65</groupId>
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/MINIFI-521_1.3_TemplateEncoding.yml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/MINIFI-521_1.3_TemplateEncoding.yml
index 379216c..366846e 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/MINIFI-521_1.3_TemplateEncoding.yml
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/MINIFI-521_1.3_TemplateEncoding.yml
@@ -1,3 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 MiNiFi Config Version: 3
 Flow Controller:
   name: MiNiFi2