You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2020/06/12 20:51:10 UTC
[nifi-minifi] 01/02: MINIFI-492 - Generic reporting tasks
This is an automated email from the ASF dual-hosted git repository.
aldrin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi.git
commit e44da568d8c099cda7b7fa4db6722d8b66aa9562
Author: Nghia Le <mi...@gmail.com>
AuthorDate: Mon May 25 22:46:14 2020 +0200
MINIFI-492 - Generic reporting tasks
This closes #187.
---
.../minifi/bootstrap/util/ConfigTransformer.java | 68 +++----
.../bootstrap/util/ConfigTransformerTest.java | 26 +++
.../src/test/resources/config-reporting-task.yml | 218 +++++++++++++++++++++
.../nifi/minifi/commons/schema/ConfigSchema.java | 15 ++
.../commons/schema/ProvenanceReportingSchema.java | 42 +++-
.../minifi/commons/schema/ReportingSchema.java | 98 +++++++++
.../commons/schema/common/CommonPropertyKeys.java | 1 +
7 files changed, 426 insertions(+), 42 deletions(-)
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
index da4af51..677080c 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
@@ -38,6 +38,7 @@ import org.apache.nifi.minifi.commons.schema.ProvenanceReportingSchema;
import org.apache.nifi.minifi.commons.schema.ProvenanceRepositorySchema;
import org.apache.nifi.minifi.commons.schema.RemotePortSchema;
import org.apache.nifi.minifi.commons.schema.RemoteProcessGroupSchema;
+import org.apache.nifi.minifi.commons.schema.ReportingSchema;
import org.apache.nifi.minifi.commons.schema.SecurityPropertiesSchema;
import org.apache.nifi.minifi.commons.schema.SensitivePropsSchema;
import org.apache.nifi.minifi.commons.schema.SwapSchema;
@@ -81,7 +82,6 @@ public final class ConfigTransformer {
// Underlying version of NIFI will be using
public static final String NIFI_VERSION = "1.8.0";
public static final String ROOT_GROUP = "Root-Group";
- public static final String DEFAULT_PROV_REPORTING_TASK_CLASS = "org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask";
public static final String NIFI_VERSION_KEY = "nifi.version";
public static final Logger logger = LoggerFactory.getLogger(ConfigTransformer.class);
@@ -178,7 +178,7 @@ public final class ConfigTransformer {
ProvenanceRepositorySchema provenanceRepositorySchema = configSchema.getProvenanceRepositorySchema();
OrderedProperties orderedProperties = new OrderedProperties();
- orderedProperties.setProperty(NIFI_VERSION_KEY, NIFI_VERSION,"# Core Properties #" + System.lineSeparator());
+ orderedProperties.setProperty(NIFI_VERSION_KEY, NIFI_VERSION, "# Core Properties #" + System.lineSeparator());
orderedProperties.setProperty("nifi.flow.configuration.file", "./conf/flow.xml.gz");
orderedProperties.setProperty("nifi.flow.configuration.archive.enabled", "false");
orderedProperties.setProperty("nifi.flow.configuration.archive.dir", "./conf/archive/");
@@ -288,7 +288,7 @@ public final class ConfigTransformer {
}
}
- protected static DOMSource createFlowXml(ConfigSchema configSchema) throws IOException, ConfigurationChangeException, ConfigTransformerException{
+ protected static DOMSource createFlowXml(ConfigSchema configSchema) throws IOException, ConfigurationChangeException, ConfigTransformerException {
try {
// create a new, empty document
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
@@ -317,9 +317,10 @@ public final class ConfigTransformer {
addProcessGroup(doc, element, processGroupSchema, new ParentGroupIdResolver(processGroupSchema));
SecurityPropertiesSchema securityProperties = configSchema.getSecurityProperties();
- if (securityProperties.useSSL()) {
+ boolean useSSL = securityProperties.useSSL();
+ if (useSSL) {
Element controllerServicesNode = doc.getElementById("controllerServices");
- if(controllerServicesNode == null) {
+ if (controllerServicesNode == null) {
controllerServicesNode = doc.createElement("controllerServices");
}
@@ -327,17 +328,27 @@ public final class ConfigTransformer {
addSSLControllerService(controllerServicesNode, securityProperties);
}
+ List<ReportingSchema> reportingTasks = configSchema.getReportingTasksSchema();
ProvenanceReportingSchema provenanceProperties = configSchema.getProvenanceReportingProperties();
if (provenanceProperties != null) {
+ provenanceProperties.setSSL(useSSL);
+ ReportingSchema provenance = provenanceProperties.convert();
+ provenance.setId("Provenance-Reporting");
+ provenance.setName("Site-To-Site-Provenance-Reporting");
+ reportingTasks.add(provenance);
+ }
+ if (reportingTasks != null) {
final Element reportingTasksNode = doc.createElement("reportingTasks");
rootNode.appendChild(reportingTasksNode);
- addProvenanceReportingTask(reportingTasksNode, configSchema);
+ for (ReportingSchema task : reportingTasks) {
+ addReportingTask(reportingTasksNode, task);
+ }
}
return new DOMSource(doc);
} catch (final ParserConfigurationException | DOMException | TransformerFactoryConfigurationError | IllegalArgumentException e) {
throw new ConfigTransformerException(e);
- } catch (Exception e){
+ } catch (Exception e) {
throw new ConfigTransformerException("Failed to parse the config YAML while writing the top level of the flow xml", e);
}
}
@@ -384,7 +395,7 @@ public final class ConfigTransformer {
addConfiguration(serviceElement, attributes);
String annotationData = controllerServiceSchema.getAnnotationData();
- if(annotationData != null && !annotationData.isEmpty()) {
+ if (annotationData != null && !annotationData.isEmpty()) {
addTextElement(element, "annotationData", annotationData);
}
@@ -480,7 +491,7 @@ public final class ConfigTransformer {
addTextElement(element, "runDurationNanos", String.valueOf(processorConfig.getRunDurationNanos()));
String annotationData = processorConfig.getAnnotationData();
- if(annotationData != null && !annotationData.isEmpty()) {
+ if (annotationData != null && !annotationData.isEmpty()) {
addTextElement(element, "annotationData", annotationData);
}
@@ -508,34 +519,23 @@ public final class ConfigTransformer {
addPosition(element);
}
- protected static void addProvenanceReportingTask(final Element element, ConfigSchema configSchema) throws ConfigurationChangeException {
+ protected static void addReportingTask(final Element parentElement, ReportingSchema reportingSchema) throws ConfigurationChangeException {
try {
- ProvenanceReportingSchema provenanceProperties = configSchema.getProvenanceReportingProperties();
- final Element taskElement = element.getOwnerDocument().createElement("reportingTask");
- addTextElement(taskElement, "id", "Provenance-Reporting");
- addTextElement(taskElement, "name", "Site-To-Site-Provenance-Reporting");
- addTextElement(taskElement, "comment", provenanceProperties.getComment());
- addTextElement(taskElement, "class", DEFAULT_PROV_REPORTING_TASK_CLASS);
- addTextElement(taskElement, "schedulingPeriod", provenanceProperties.getSchedulingPeriod());
- addTextElement(taskElement, "scheduledState", "RUNNING");
- addTextElement(taskElement, "schedulingStrategy", provenanceProperties.getSchedulingStrategy());
+ final Document doc = parentElement.getOwnerDocument();
+ final Element element = doc.createElement("reportingTask");
+ parentElement.appendChild(element);
- Map<String, Object> attributes = new HashMap<>();
- attributes.put("Destination URL", provenanceProperties.getDestinationUrl());
- attributes.put("Input Port Name", provenanceProperties.getPortName());
- attributes.put("Instance URL", provenanceProperties.getOriginatingUrl());
- attributes.put("Compress Events", provenanceProperties.getUseCompression());
- attributes.put("Batch Size", provenanceProperties.getBatchSize());
- attributes.put("Communications Timeout", provenanceProperties.getTimeout());
-
- SecurityPropertiesSchema securityProps = configSchema.getSecurityProperties();
- if (securityProps.useSSL()) {
- attributes.put("SSL Context Service", "SSL-Context-Service");
- }
+ addTextElement(element, "id", reportingSchema.getId());
+ addTextElement(element, "name", reportingSchema.getName());
+ addTextElement(element, "comment", reportingSchema.getComment());
+ addTextElement(element, "class", reportingSchema.getReportingClass());
+ addTextElement(element, "schedulingPeriod", reportingSchema.getSchedulingPeriod());
+ addTextElement(element, "scheduledState", "RUNNING");
+ addTextElement(element, "schedulingStrategy", reportingSchema.getSchedulingStrategy());
- addConfiguration(taskElement, attributes);
+ addConfiguration(element, reportingSchema.getProperties());
- element.appendChild(taskElement);
+ parentElement.appendChild(element);
} catch (Exception e) {
throw new ConfigurationChangeException("Failed to parse the config YAML while trying to add the Provenance Reporting Task", e);
}
@@ -725,7 +725,7 @@ public final class ConfigTransformer {
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n" +
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" +
"# See the License for the specific language governing permissions and\n" +
- "# limitations under the License.\n"+
+ "# limitations under the License.\n" +
"\n";
}
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
index 32a5852..88bc318 100644
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
@@ -29,6 +29,7 @@ import org.apache.nifi.minifi.commons.schema.ProcessGroupSchema;
import org.apache.nifi.minifi.commons.schema.ProcessorSchema;
import org.apache.nifi.minifi.commons.schema.RemotePortSchema;
import org.apache.nifi.minifi.commons.schema.RemoteProcessGroupSchema;
+import org.apache.nifi.minifi.commons.schema.ReportingSchema;
import org.apache.nifi.minifi.commons.schema.common.StringUtil;
import org.apache.nifi.minifi.commons.schema.exception.SchemaLoaderException;
import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader;
@@ -123,6 +124,11 @@ public class ConfigTransformerTest {
}
@Test
+ public void testReportingTasksTransform() throws Exception {
+ testConfigFileTransform("config-reporting-task.yml");
+ }
+
+ @Test
public void testProcessGroupsTransform() throws Exception {
testConfigFileTransform("config-process-groups.yml");
}
@@ -507,6 +513,7 @@ public class ConfigTransformerTest {
Document document = documentBuilder.parse(new ByteArrayInputStream(outputStream.toByteArray()));
testProcessGroup((Element) xPathFactory.newXPath().evaluate("flowController/rootGroup", document, XPathConstants.NODE), configSchema.getProcessGroupSchema());
+ testReportingTasks((Element) xPathFactory.newXPath().evaluate("flowController/reportingTasks", document, XPathConstants.NODE), configSchema.getReportingTasksSchema());
}
private void testProcessGroup(Element element, ProcessGroupSchema processGroupSchema) throws XPathExpressionException {
@@ -579,6 +586,25 @@ public class ConfigTransformerTest {
testProperties(element, processorSchema.getProperties());
}
+ private void testReportingTasks(Element element, List<ReportingSchema> reportingSchemas) throws XPathExpressionException {
+ NodeList taskElements = (NodeList) xPathFactory.newXPath().evaluate("reportingTask", element, XPathConstants.NODESET);
+ assertEquals(reportingSchemas.size(), taskElements.getLength());
+ for (int i = 0; i < taskElements.getLength(); i++) {
+ testReportingTask((Element) taskElements.item(i), reportingSchemas.get(i));
+ }
+ }
+
+ private void testReportingTask(Element element, ReportingSchema reportingSchema) throws XPathExpressionException {
+ assertEquals(reportingSchema.getId(), getText(element, "id"));
+ assertEquals(reportingSchema.getName(), getText(element, "name"));
+ assertEquals(reportingSchema.getComment(), getText(element, "comment"));
+ assertEquals(reportingSchema.getReportingClass(), getText(element, "class"));
+ assertEquals(reportingSchema.getSchedulingPeriod(), getText(element, "schedulingPeriod"));
+ assertEquals(reportingSchema.getSchedulingStrategy(), getText(element, "schedulingStrategy"));
+
+ testProperties(element, reportingSchema.getProperties());
+ }
+
private void testControllerService(Element element, ControllerServiceSchema controllerServiceSchema) throws XPathExpressionException {
assertEquals(controllerServiceSchema.getId(), getText(element, "id"));
assertEquals(controllerServiceSchema.getName(), getText(element, "name"));
diff --git a/minifi-bootstrap/src/test/resources/config-reporting-task.yml b/minifi-bootstrap/src/test/resources/config-reporting-task.yml
new file mode 100644
index 0000000..b746316
--- /dev/null
+++ b/minifi-bootstrap/src/test/resources/config-reporting-task.yml
@@ -0,0 +1,218 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+MiNiFi Config Version: 3
+
+Flow Controller:
+ name: MiNiFi Flow
+ comment:
+
+Core Properties:
+ flow controller graceful shutdown period: 10 sec
+ flow service write delay interval: 500 ms
+ administrative yield duration: 30 sec
+ bored yield duration: 10 millis
+ max concurrent threads: 1
+
+FlowFile Repository:
+ partitions: 256
+ checkpoint interval: 2 mins
+ always sync: false
+ Swap:
+ threshold: 20000
+ in period: 5 sec
+ in threads: 1
+ out period: 5 sec
+ out threads: 4
+
+Content Repository:
+ content claim max appendable size: 10 MB
+ content claim max flow files: 100
+ always sync: false
+
+Component Status Repository:
+ buffer size: 1440
+ snapshot frequency: 1 min
+
+Security Properties:
+ keystore: ''
+ keystore type: ''
+ keystore password: ''
+ key password: ''
+ truststore: ''
+ truststore type: ''
+ truststore password: ''
+ ssl protocol: ''
+ Sensitive Props:
+ key: ''
+ algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL
+ provider: BC
+
+Processors:
+ - name: TailAppLog
+ id: c05f74a3-9f8f-3bb7-bced-f4735f962ec8
+ class: org.apache.nifi.processors.standard.TailFile
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 10 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list:
+ Properties:
+ File to Tail: logs/minifi-app.log
+ Rolling Filename Pattern: minifi-app*
+ Initial Start Position: Beginning of File
+ - name: SplitIntoSingleLines
+ id: afbd6a07-64a0-39b3-953e-65b4678c5177
+ class: org.apache.nifi.processors.standard.SplitText
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 0 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list:
+ - failure
+ - original
+ Properties:
+ Line Split Count: 1
+ Header Line Count: 0
+ Remove Trailing Newlines: true
+ - name: RouteErrors
+ id: c48394b5-39a5-3bd9-bf01-0bff0e0ad28d
+ class: org.apache.nifi.processors.standard.RouteText
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 0 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list:
+ - unmatched
+ - original
+ Properties:
+ Routing Strategy: Route to 'matched' if line matches all conditions
+ Matching Strategy: Contains
+ Character Set: UTF-8
+ Ignore Leading/Trailing Whitespace: true
+ Ignore Case: true
+ Grouping Regular Expression:
+ WALFFR: WriteAheadFlowFileRepository
+ - name: PutFile
+ id: 1d80a4d8-86ae-3994-a988-2d7a46949091
+ class: org.apache.nifi.processors.standard.PutFile
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 0 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list:
+ - failure
+ - success
+ Properties:
+ Directory: ./
+ Conflict Resolution Strategy: replace
+ Create Missing Directories: true
+ Maximum File Count:
+ Last Modified Time:
+ Permissions:
+ Owner:
+ Group:
+
+Connections:
+ - name: TailToSplit
+ id: 0401b747-1dca-31c7-ab4b-cdacf7e6c44b
+ source name: TailAppLog
+ source id: c05f74a3-9f8f-3bb7-bced-f4735f962ec8
+ source relationship names:
+ - success
+ destination name: SplitIntoSingleLines
+ destination id: afbd6a07-64a0-39b3-953e-65b4678c5177
+ max work queue size: 0
+ max work queue data size: 1 MB
+ flowfile expiration: 60 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer
+ - name: SplitToRoute
+ id: 093f9479-e2f0-3eb2-8184-151f42f22f34
+ source name: SplitIntoSingleLines
+ source id: afbd6a07-64a0-39b3-953e-65b4678c5177
+ source relationship names:
+ - splits
+ destination name: RouteErrors
+ destination id: c48394b5-39a5-3bd9-bf01-0bff0e0ad28d
+ max work queue size: 0
+ max work queue data size: 1 MB
+ flowfile expiration: 60 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer
+ - name: RouteToS2S
+ id: aacad63c-7e4c-31c9-888b-111e07c6fd22
+ source name: RouteErrors
+ source id: c48394b5-39a5-3bd9-bf01-0bff0e0ad28d
+ source relationship names:
+ - matched
+ destination name: PutFile
+ destination id: 1d80a4d8-86ae-3994-a988-2d7a46949091
+ max work queue size: 0
+ max work queue data size: 1 MB
+ flowfile expiration: 60 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer
+
+Provenance Reporting:
+ comment: old school config style
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 30 sec
+ port name: provenance
+ destination url: http://localhost:8080/
+ originating url: http://${hostname(true)}:8081/nifi
+ use compression: true
+ timeout: 30 secs
+ batch size: 1000
+
+Reporting Tasks:
+ - name: Viet-Nam
+ class: org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask
+ comment: this sentence has three a’s, two c’s, two d’s, twenty-eight e’s, four f’s, four g’s, ten h’s, eight i’s, two l’s, eleven n’s, six o’s, seven r’s, twenty-seven s’s, eighteen t’s, three u’s, five v’s, six w’s, three x’s, and three y’s.
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 30 sec
+ Properties:
+ Input Port Name: provenance
+ Destination URL: http://localhost:8082/
+ Instance URL: http://${hostname(true)}:8081/nifi
+ Compress Events: true
+ Communications Timeout: 30 secs
+ Batch Size: 1000
+
+ - name: Why-Naming-Is-So-Hard
+ class: org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask
+ comment:
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 30 sec
+ Properties:
+ Input Port Name: provenance
+ Destination URL: http://localhost:8083/
+ Instance URL: http://${hostname(true)}:8081/nifi
+ Compress Events: true
+ Communications Timeout: 30 secs
+ Batch Size: 1000
+
+ - name: Monitor-Memor-inator
+ id: e415284f-0170-1000-1174-52ac9c53c22b
+ class: org.apache.nifi.controller.MonitorMemory
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 5 mins
+ Properties:
+ Usage Threshold: 1%
\ No newline at end of file
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
index ff9c9bf..d6b36ef 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
@@ -36,6 +36,7 @@ import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CO
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CORE_PROPS_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.FLOWFILE_REPO_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.FLOW_CONTROLLER_PROPS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.GENERAL_REPORTING_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NIFI_PROPERTIES_OVERRIDES_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPORTING_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPO_KEY;
@@ -54,6 +55,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
public static final String FOUND_THE_FOLLOWING_DUPLICATE_CONTROLLER_SERVICE_IDS = "Found the following duplicate controller service ids: ";
public static final String FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS = "Found the following duplicate connection ids: ";
public static final String FOUND_THE_FOLLOWING_DUPLICATE_FUNNEL_IDS = "Found the following duplicate funnel ids: ";
+ public static final String FOUND_THE_FOLLOWING_DUPLICATE_REPORTING_IDS = "Found the following duplicate reporting ids: ";
public static String TOP_LEVEL_NAME = "top level";
private FlowControllerSchema flowControllerProperties;
private CorePropertiesSchema coreProperties;
@@ -63,6 +65,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
private SecurityPropertiesSchema securityProperties;
private ProcessGroupSchema processGroupSchema;
private ProvenanceReportingSchema provenanceReportingProperties;
+ private List<ReportingSchema> reportingTasks;
private ProvenanceRepositorySchema provenanceRepositorySchema;
@@ -86,6 +89,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
processGroupSchema = new ProcessGroupSchema(map, TOP_LEVEL_NAME);
provenanceReportingProperties = getMapAsType(map, PROVENANCE_REPORTING_KEY, ProvenanceReportingSchema.class, TOP_LEVEL_NAME, false, false);
+ reportingTasks = getOptionalKeyAsList(map, GENERAL_REPORTING_KEY, ReportingSchema::new, TOP_LEVEL_NAME);
nifiPropertiesOverrides = (Map<String, String>) map.get(NIFI_PROPERTIES_OVERRIDES_KEY);
if (nifiPropertiesOverrides == null) {
@@ -100,6 +104,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
addIssuesIfNotNull(securityProperties);
addIssuesIfNotNull(processGroupSchema);
addIssuesIfNotNull(provenanceReportingProperties);
+ addIssuesIfNotNull(reportingTasks);
addIssuesIfNotNull(provenanceRepositorySchema);
List<ProcessGroupSchema> allProcessGroups = getAllProcessGroups(processGroupSchema);
@@ -117,6 +122,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
.flatMap(r -> r.getOutputPorts().stream()).map(RemotePortSchema::getId).collect(Collectors.toList());
List<String> allInputPortIds = allProcessGroups.stream().flatMap(p -> p.getInputPortSchemas().stream()).map(PortSchema::getId).collect(Collectors.toList());
List<String> allOutputPortIds = allProcessGroups.stream().flatMap(p -> p.getOutputPortSchemas().stream()).map(PortSchema::getId).collect(Collectors.toList());
+ List<String> allReportingIds = reportingTasks.stream().map(ReportingSchema::getId).collect(Collectors.toList());
checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_IDS, allProcessorIds);
checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_CONTROLLER_SERVICE_IDS, allControllerServiceIds);
@@ -124,6 +130,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS, allConnectionIds);
checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_INPUT_PORT_IDS, allInputPortIds);
checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_OUTPUT_PORT_IDS, allOutputPortIds);
+ checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REPORTING_IDS, allReportingIds);
// Potential connection sources and destinations need to have unique ids
CollectionOverlap<String> overlapResults = new CollectionOverlap<>(new HashSet<>(allProcessorIds), new HashSet<>(allRemoteInputPortIds), new HashSet<>(allRemoteOutputPortIds),
@@ -167,6 +174,10 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
putIfNotNull(result, SECURITY_PROPS_KEY, securityProperties);
result.putAll(processGroupSchema.toMap());
putIfNotNull(result, PROVENANCE_REPORTING_KEY, provenanceReportingProperties);
+ if (!reportingTasks.isEmpty()) {
+ // for backward compatibility
+ putListIfNotNull(result, GENERAL_REPORTING_KEY, reportingTasks);
+ }
result.put(NIFI_PROPERTIES_OVERRIDES_KEY, nifiPropertiesOverrides);
return result;
}
@@ -207,6 +218,10 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
return provenanceReportingProperties;
}
+ public List<ReportingSchema> getReportingTasksSchema() {
+ return reportingTasks;
+ }
+
public ComponentStatusRepositorySchema getComponentStatusRepositoryProperties() {
return componentStatusRepositoryProperties;
}
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceReportingSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceReportingSchema.java
index d197ccb..882081b 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceReportingSchema.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceReportingSchema.java
@@ -21,19 +21,20 @@ import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
import org.apache.nifi.minifi.commons.schema.common.WritableSchema;
import org.apache.nifi.scheduling.SchedulingStrategy;
+import java.util.HashMap;
import java.util.Map;
+import static org.apache.nifi.minifi.commons.schema.RemoteProcessGroupSchema.TIMEOUT_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CLASS_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMMENT_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROPERTIES_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPORTING_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SCHEDULING_PERIOD_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SCHEDULING_STRATEGY_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.USE_COMPRESSION_KEY;
-import static org.apache.nifi.minifi.commons.schema.RemoteProcessGroupSchema.TIMEOUT_KEY;
-/**
- *
- */
public class ProvenanceReportingSchema extends BaseSchema implements WritableSchema {
+ public static final String DEFAULT_PROV_REPORTING_TASK_CLASS = "org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask";
public static final String DESTINATION_URL_KEY = "destination url";
public static final String PORT_NAME_KEY = "port name";
public static final String ORIGINATING_URL_KEY = "originating url";
@@ -54,6 +55,7 @@ public class ProvenanceReportingSchema extends BaseSchema implements WritableSch
private Boolean useCompression = DEFAULT_USE_COMPRESSION;
private String timeout = DEFAULT_TIMEOUT;
private Number batchSize = DEFAULT_BATCH_SIZE;
+ private String SSL;
public ProvenanceReportingSchema(Map map) {
schedulingStrategy = getRequiredKeyAsType(map, SCHEDULING_STRATEGY_KEY, String.class, PROVENANCE_REPORTING_KEY);
@@ -64,18 +66,38 @@ public class ProvenanceReportingSchema extends BaseSchema implements WritableSch
addValidationIssue(SCHEDULING_STRATEGY_KEY, PROVENANCE_REPORTING_KEY, "it is not a valid scheduling strategy");
}
}
-
schedulingPeriod = getRequiredKeyAsType(map, SCHEDULING_PERIOD_KEY, String.class, PROVENANCE_REPORTING_KEY);
- destinationUrl = getRequiredKeyAsType(map, DESTINATION_URL_KEY, String.class, PROVENANCE_REPORTING_KEY);
- portName = getRequiredKeyAsType(map, PORT_NAME_KEY, String.class, PROVENANCE_REPORTING_KEY);
-
comment = getOptionalKeyAsType(map, COMMENT_KEY, String.class, PROVENANCE_REPORTING_KEY, "");
+
originatingUrl = getOptionalKeyAsType(map, ORIGINATING_URL_KEY, String.class, PROVENANCE_REPORTING_KEY, DEFAULT_ORGINATING_URL);
+ destinationUrl = getRequiredKeyAsType(map, DESTINATION_URL_KEY, String.class, PROVENANCE_REPORTING_KEY);
+ portName = getRequiredKeyAsType(map, PORT_NAME_KEY, String.class, PROVENANCE_REPORTING_KEY);
useCompression = getOptionalKeyAsType(map, USE_COMPRESSION_KEY, Boolean.class, PROVENANCE_REPORTING_KEY, DEFAULT_USE_COMPRESSION);
timeout = getOptionalKeyAsType(map, TIMEOUT_KEY, String.class, PROVENANCE_REPORTING_KEY, DEFAULT_TIMEOUT);
batchSize = getOptionalKeyAsType(map, BATCH_SIZE_KEY, Number.class, PROVENANCE_REPORTING_KEY, DEFAULT_BATCH_SIZE);
}
+ public ReportingSchema convert() {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put("Destination URL", destinationUrl);
+ properties.put("Input Port Name", portName);
+ properties.put("Instance URL", originatingUrl);
+ properties.put("Compress Events", useCompression);
+ properties.put("Batch Size", batchSize);
+ properties.put("Communications Timeout", timeout);
+ properties.put("SSL Context Service", SSL);
+
+ Map<String, Object> target = super.mapSupplier.get();
+ target.put(CLASS_KEY, DEFAULT_PROV_REPORTING_TASK_CLASS);
+ target.put(COMMENT_KEY, comment);
+ target.put(SCHEDULING_STRATEGY_KEY, schedulingStrategy);
+ target.put(SCHEDULING_PERIOD_KEY, schedulingPeriod);
+ target.put(PROPERTIES_KEY, properties);
+
+ ReportingSchema provenance = new ReportingSchema(target);
+ return provenance;
+ }
+
@Override
public Map<String, Object> toMap() {
Map<String, Object> result = mapSupplier.get();
@@ -91,6 +113,10 @@ public class ProvenanceReportingSchema extends BaseSchema implements WritableSch
return result;
}
+ public void setSSL(boolean useSSL) {
+ SSL = useSSL ? "SSL-Context-Service" : "";
+ }
+
public String getComment() {
return comment;
}
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ReportingSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ReportingSchema.java
new file mode 100644
index 0000000..a833b57
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ReportingSchema.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.schema;
+
+import org.apache.nifi.minifi.commons.schema.common.BaseSchemaWithIdAndName;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CLASS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMMENT_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.DEFAULT_PROPERTIES;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROPERTIES_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SCHEDULING_PERIOD_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SCHEDULING_STRATEGY_KEY;
+import static org.apache.nifi.minifi.commons.schema.ProcessorSchema.IT_IS_NOT_A_VALID_SCHEDULING_STRATEGY;
+
+public class ReportingSchema extends BaseSchemaWithIdAndName {
+ private String schedulingStrategy;
+ private String schedulingPeriod;
+ private String comment;
+
+ private String reportingClass;
+ private Map<String, Object> properties = DEFAULT_PROPERTIES;
+
+ public ReportingSchema(Map map) {
+ super(map, "Reporting(id: {id}, name: {name})");
+ if (this.getId().equals("")) {
+ // MiNiFi will throw an error if it can not find `id` of BaseSchemaWithIdAndName for YML config version 3
+ this.setId(UUID.randomUUID().toString());
+ }
+ String wrapperName = getWrapperName();
+ reportingClass = getRequiredKeyAsType(map, CLASS_KEY, String.class, wrapperName);
+ schedulingStrategy = getRequiredKeyAsType(map, SCHEDULING_STRATEGY_KEY, String.class, wrapperName);
+ if (schedulingStrategy != null && !isSchedulingStrategy(schedulingStrategy)) {
+ addValidationIssue(SCHEDULING_STRATEGY_KEY, wrapperName, IT_IS_NOT_A_VALID_SCHEDULING_STRATEGY);
+ }
+ schedulingPeriod = getRequiredKeyAsType(map, SCHEDULING_PERIOD_KEY, String.class, wrapperName);
+ comment = getOptionalKeyAsType(map, COMMENT_KEY, String.class, wrapperName, "");
+ properties = getOptionalKeyAsType(map, PROPERTIES_KEY, Map.class, wrapperName, DEFAULT_PROPERTIES);
+ }
+
+ public static boolean isSchedulingStrategy(String string) {
+ try {
+ SchedulingStrategy.valueOf(string);
+ } catch (Exception e) {
+ return false;
+ }
+ return true;
+ }
+
+ public Map<String, Object> toMap() {
+ Map<String, Object> result = super.toMap();
+ result.put(CLASS_KEY, reportingClass);
+ result.put(COMMENT_KEY, comment);
+ result.put(SCHEDULING_STRATEGY_KEY, schedulingStrategy);
+ result.put(SCHEDULING_PERIOD_KEY, schedulingPeriod);
+ result.put(PROPERTIES_KEY, new HashMap<>(properties));
+ return result;
+ }
+
+ public String getComment() {
+ return comment;
+ }
+
+ public String getSchedulingStrategy() {
+ return schedulingStrategy;
+ }
+
+ public String getSchedulingPeriod() {
+ return schedulingPeriod;
+ }
+
+ public String getReportingClass() {
+ return reportingClass;
+ }
+
+ public Map<String, Object> getProperties() {
+ return properties;
+ }
+}
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java
index c52f7fe..99d0086 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java
@@ -32,6 +32,7 @@ public class CommonPropertyKeys {
public static final String PROCESSORS_KEY = "Processors";
public static final String CONNECTIONS_KEY = "Connections";
public static final String PROVENANCE_REPORTING_KEY = "Provenance Reporting";
+ public static final String GENERAL_REPORTING_KEY = "Reporting Tasks";
public static final String REMOTE_PROCESS_GROUPS_KEY = "Remote Process Groups";
public static final String INPUT_PORTS_KEY = "Input Ports";
public static final String OUTPUT_PORTS_KEY = "Output Ports";