You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/11/09 21:49:14 UTC
[2/2] nifi-minifi git commit: MINIFI-47 - Funnel support
MINIFI-47 - Funnel support
This closes #52
Signed-off-by: Joseph Percivall <JP...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi/commit/12a58a86
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi/tree/12a58a86
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi/diff/12a58a86
Branch: refs/heads/master
Commit: 12a58a8698c3f661d14156c83c8ea0de2f7a74dd
Parents: 31855bb
Author: Bryan Rosander <br...@apache.org>
Authored: Thu Nov 3 11:14:56 2016 -0400
Committer: Joseph Percivall <JP...@apache.org>
Committed: Wed Nov 9 16:48:48 2016 -0500
----------------------------------------------------------------------
.../bootstrap/util/ConfigTransformer.java | 25 +-
.../bootstrap/util/ParentGroupIdResolver.java | 12 +-
.../bootstrap/util/ConfigTransformerTest.java | 22 +-
.../util/ParentGroupIdResolverTest.java | 31 ++
.../resources/stress-test-framework-funnel.yml | 211 ++++++++
.../minifi/commons/schema/ConfigSchema.java | 6 +-
.../minifi/commons/schema/FunnelSchema.java | 30 ++
.../commons/schema/ProcessGroupSchema.java | 14 +
.../commons/schema/common/BaseSchemaWithId.java | 73 +++
.../schema/common/BaseSchemaWithIdAndName.java | 41 +-
.../schema/common/CommonPropertyKeys.java | 1 +
.../src/main/markdown/System_Admin_Guide.md | 9 +
.../src/main/resources/conf/config.yml | 1 +
.../toolkit/configuration/ConfigMain.java | 18 +-
.../configuration/dto/ConfigSchemaFunction.java | 19 +-
.../configuration/dto/FunnelSchemaFunction.java | 38 ++
.../toolkit/configuration/ConfigMainTest.java | 14 +-
.../src/test/resources/CsvToJson.yml | 1 +
.../resources/DecompressionCircularFlow.yml | 1 +
.../resources/InvokeHttpMiNiFiTemplateTest.yml | 1 +
.../test/resources/MultipleRelationships.yml | 1 +
.../ProcessGroupsAndRemoteProcessGroups.yml | 3 +
...aceTextExpressionLanguageCSVReformatting.yml | 1 +
.../src/test/resources/StressTestFramework.yml | 1 +
.../resources/StressTestFrameworkFunnel.xml | 540 +++++++++++++++++++
.../resources/StressTestFrameworkFunnel.yml | 211 ++++++++
26 files changed, 1259 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
index 9fa7f05..19d3268 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
@@ -27,6 +27,7 @@ import org.apache.nifi.minifi.commons.schema.ContentRepositorySchema;
import org.apache.nifi.minifi.commons.schema.CorePropertiesSchema;
import org.apache.nifi.minifi.commons.schema.FlowControllerSchema;
import org.apache.nifi.minifi.commons.schema.FlowFileRepositorySchema;
+import org.apache.nifi.minifi.commons.schema.FunnelSchema;
import org.apache.nifi.minifi.commons.schema.PortSchema;
import org.apache.nifi.minifi.commons.schema.ProcessGroupSchema;
import org.apache.nifi.minifi.commons.schema.ProcessorSchema;
@@ -369,6 +370,10 @@ public final class ConfigTransformer {
addRemoteProcessGroup(element, remoteProcessingGroupSchema);
}
+ for (FunnelSchema funnelSchema : processGroupSchema.getFunnels()) {
+ addFunnel(element, funnelSchema);
+ }
+
for (PortSchema portSchema : processGroupSchema.getInputPortSchemas()) {
addPort(doc, element, portSchema, "inputPort");
}
@@ -448,6 +453,17 @@ public final class ConfigTransformer {
}
}
+
+ protected static void addFunnel(final Element parentElement, FunnelSchema funnelSchema) {
+ Document document = parentElement.getOwnerDocument();
+ Element element = document.createElement("funnel");
+ parentElement.appendChild(element);
+
+ addTextElement(element, "id", funnelSchema.getId());
+
+ addPosition(element);
+ }
+
protected static void addProvenanceReportingTask(final Element element, ConfigSchema configSchema) throws ConfigurationChangeException {
try {
ProvenanceReportingSchema provenanceProperties = configSchema.getProvenanceReportingProperties();
@@ -605,8 +621,13 @@ public final class ConfigTransformer {
if (parentId != null) {
type = "OUTPUT_PORT";
} else {
- parentId = parentGroupIdResolver.getProcessorParentId(id);
- type = "PROCESSOR";
+ parentId = parentGroupIdResolver.getFunnelParentId(id);
+ if (parentId != null) {
+ type = "FUNNEL";
+ } else {
+ parentId = parentGroupIdResolver.getProcessorParentId(id);
+ type = "PROCESSOR";
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolver.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolver.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolver.java
index 71088ee..05aa27d 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolver.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolver.java
@@ -22,7 +22,7 @@ package org.apache.nifi.minifi.bootstrap.util;
import org.apache.nifi.minifi.commons.schema.ProcessGroupSchema;
import org.apache.nifi.minifi.commons.schema.RemoteInputPortSchema;
import org.apache.nifi.minifi.commons.schema.RemoteProcessingGroupSchema;
-import org.apache.nifi.minifi.commons.schema.common.BaseSchemaWithIdAndName;
+import org.apache.nifi.minifi.commons.schema.common.BaseSchemaWithId;
import java.util.Collection;
import java.util.HashMap;
@@ -33,23 +33,25 @@ public class ParentGroupIdResolver {
private final Map<String, String> processorIdToParentIdMap;
private final Map<String, String> inputPortIdToParentIdMap;
private final Map<String, String> outputPortIdToParentIdMap;
+ private final Map<String, String> funnelIdToParentIdMap;
private final Map<String, String> remoteInputPortIdToParentIdMap;
public ParentGroupIdResolver(ProcessGroupSchema processGroupSchema) {
this.processorIdToParentIdMap = getParentIdMap(processGroupSchema, ProcessGroupSchema::getProcessors);
this.inputPortIdToParentIdMap = getParentIdMap(processGroupSchema, ProcessGroupSchema::getInputPortSchemas);
this.outputPortIdToParentIdMap = getParentIdMap(processGroupSchema, ProcessGroupSchema::getOutputPortSchemas);
+ this.funnelIdToParentIdMap = getParentIdMap(processGroupSchema, ProcessGroupSchema::getFunnels);
this.remoteInputPortIdToParentIdMap = getRemoteInputPortParentIdMap(processGroupSchema);
}
- protected static Map<String, String> getParentIdMap(ProcessGroupSchema processGroupSchema, Function<ProcessGroupSchema, Collection<? extends BaseSchemaWithIdAndName>> schemaAccessor) {
+ protected static Map<String, String> getParentIdMap(ProcessGroupSchema processGroupSchema, Function<ProcessGroupSchema, Collection<? extends BaseSchemaWithId>> schemaAccessor) {
Map<String, String> map = new HashMap<>();
getParentIdMap(processGroupSchema, map, schemaAccessor);
return map;
}
protected static void getParentIdMap(ProcessGroupSchema processGroupSchema, Map<String, String> output, Function<ProcessGroupSchema,
- Collection<? extends BaseSchemaWithIdAndName>> schemaAccessor) {
+ Collection<? extends BaseSchemaWithId>> schemaAccessor) {
schemaAccessor.apply(processGroupSchema).forEach(p -> output.put(p.getId(), processGroupSchema.getId()));
processGroupSchema.getProcessGroupSchemas().forEach(p -> getParentIdMap(p, output, schemaAccessor));
}
@@ -84,4 +86,8 @@ public class ParentGroupIdResolver {
public String getProcessorParentId(String id) {
return processorIdToParentIdMap.get(id);
}
+
+ public String getFunnelParentId(String id) {
+ return funnelIdToParentIdMap.get(id);
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
index cac9d16..b41dc90 100644
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
@@ -20,6 +20,7 @@ package org.apache.nifi.minifi.bootstrap.util;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
import org.apache.nifi.minifi.commons.schema.ConfigSchema;
import org.apache.nifi.minifi.commons.schema.ConnectionSchema;
+import org.apache.nifi.minifi.commons.schema.FunnelSchema;
import org.apache.nifi.minifi.commons.schema.PortSchema;
import org.apache.nifi.minifi.commons.schema.ProcessGroupSchema;
import org.apache.nifi.minifi.commons.schema.ProcessorSchema;
@@ -100,7 +101,16 @@ public class ConfigTransformerTest {
@Test
public void testProcessGroupsTransform() throws Exception {
- ConfigSchema configSchema = SchemaLoader.loadConfigSchemaFromYaml(ConfigTransformerTest.class.getClassLoader().getResourceAsStream("config-process-groups.yml"));
+ testConfigFileTransform("config-process-groups.yml");
+ }
+
+ @Test
+ public void testFunnelsTransform() throws Exception {
+ testConfigFileTransform("stress-test-framework-funnel.yml");
+ }
+
+ public void testConfigFileTransform(String configFile) throws Exception {
+ ConfigSchema configSchema = SchemaLoader.loadConfigSchemaFromYaml(ConfigTransformerTest.class.getClassLoader().getResourceAsStream(configFile));
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ConfigTransformer.writeFlowXmlFile(configSchema, outputStream);
@@ -126,6 +136,12 @@ public class ConfigTransformerTest {
testRemoteProcessGroups((Element) remoteProcessGroupElements.item(i), processGroupSchema.getRemoteProcessingGroups().get(i));
}
+ NodeList funnelElements = (NodeList) xPathFactory.newXPath().evaluate("funnel", element, XPathConstants.NODESET);
+ assertEquals(processGroupSchema.getFunnels().size(), funnelElements.getLength());
+ for (int i = 0; i < funnelElements.getLength(); i++) {
+ testFunnel((Element) funnelElements.item(i), processGroupSchema.getFunnels().get(i));
+ }
+
NodeList inputPortElements = (NodeList) xPathFactory.newXPath().evaluate("inputPort", element, XPathConstants.NODESET);
assertEquals(processGroupSchema.getInputPortSchemas().size(), inputPortElements.getLength());
for (int i = 0; i < inputPortElements.getLength(); i++) {
@@ -195,6 +211,10 @@ public class ConfigTransformerTest {
assertEquals("RUNNING", getText(element, "scheduledState"));
}
+ private void testFunnel(Element element, FunnelSchema funnelSchema) throws XPathExpressionException {
+ assertEquals(funnelSchema.getId(), getText(element, "id"));
+ }
+
private void testConnection(Element element, ConnectionSchema connectionSchema) throws XPathExpressionException {
assertEquals(connectionSchema.getId(), getText(element, "id"));
assertEquals(connectionSchema.getName(), getText(element, "name"));
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolverTest.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolverTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolverTest.java
index 6f557b7..c67d250 100644
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolverTest.java
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolverTest.java
@@ -57,6 +57,8 @@ public class ParentGroupIdResolverTest {
assertNull(parentGroupIdResolver.getOutputPortParentId("two"));
assertNull(parentGroupIdResolver.getProcessorParentId("one"));
assertNull(parentGroupIdResolver.getProcessorParentId("two"));
+ assertNull(parentGroupIdResolver.getFunnelParentId("one"));
+ assertNull(parentGroupIdResolver.getFunnelParentId("two"));
}
@Test
@@ -78,6 +80,8 @@ public class ParentGroupIdResolverTest {
assertNull(parentGroupIdResolver.getOutputPortParentId("two"));
assertNull(parentGroupIdResolver.getProcessorParentId("one"));
assertNull(parentGroupIdResolver.getProcessorParentId("two"));
+ assertNull(parentGroupIdResolver.getFunnelParentId("one"));
+ assertNull(parentGroupIdResolver.getFunnelParentId("two"));
}
@Test
@@ -99,6 +103,8 @@ public class ParentGroupIdResolverTest {
assertEquals("pgTwo", parentGroupIdResolver.getOutputPortParentId("two"));
assertNull(parentGroupIdResolver.getProcessorParentId("one"));
assertNull(parentGroupIdResolver.getProcessorParentId("two"));
+ assertNull(parentGroupIdResolver.getFunnelParentId("one"));
+ assertNull(parentGroupIdResolver.getFunnelParentId("two"));
}
@Test
@@ -120,6 +126,31 @@ public class ParentGroupIdResolverTest {
assertNull(parentGroupIdResolver.getOutputPortParentId("two"));
assertEquals(ConfigTransformer.ROOT_GROUP, parentGroupIdResolver.getProcessorParentId("one"));
assertEquals("pgTwo", parentGroupIdResolver.getProcessorParentId("two"));
+ assertNull(parentGroupIdResolver.getFunnelParentId("one"));
+ assertNull(parentGroupIdResolver.getFunnelParentId("two"));
+ }
+
+ @Test
+ public void testFunnelParentId() throws IOException, SchemaLoaderException {
+ List<String> configLines = new ArrayList<>();
+ configLines.add("MiNiFi Config Version: 2");
+ configLines.add("Funnels:");
+ configLines.add("- id: one");
+ configLines.add("Process Groups:");
+ configLines.add("- id: pgTwo");
+ configLines.add(" Funnels:");
+ configLines.add(" - id: two");
+ ParentGroupIdResolver parentGroupIdResolver = createParentGroupIdResolver(configLines);
+ assertNull(parentGroupIdResolver.getRemoteInputPortParentId("one"));
+ assertNull(parentGroupIdResolver.getRemoteInputPortParentId("two"));
+ assertNull(parentGroupIdResolver.getInputPortParentId("one"));
+ assertNull(parentGroupIdResolver.getInputPortParentId("two"));
+ assertNull(parentGroupIdResolver.getOutputPortParentId("one"));
+ assertNull(parentGroupIdResolver.getOutputPortParentId("two"));
+ assertNull(parentGroupIdResolver.getProcessorParentId("one"));
+ assertNull(parentGroupIdResolver.getProcessorParentId("two"));
+ assertEquals(ConfigTransformer.ROOT_GROUP, parentGroupIdResolver.getFunnelParentId("one"));
+ assertEquals("pgTwo", parentGroupIdResolver.getFunnelParentId("two"));
}
private ParentGroupIdResolver createParentGroupIdResolver(List<String> configLines) throws IOException, SchemaLoaderException {
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-bootstrap/src/test/resources/stress-test-framework-funnel.yml
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/resources/stress-test-framework-funnel.yml b/minifi-bootstrap/src/test/resources/stress-test-framework-funnel.yml
new file mode 100644
index 0000000..bbbda3a
--- /dev/null
+++ b/minifi-bootstrap/src/test/resources/stress-test-framework-funnel.yml
@@ -0,0 +1,211 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+MiNiFi Config Version: 2
+Flow Controller:
+ name: StressTestFramework
+ comment: ''
+Core Properties:
+ flow controller graceful shutdown period: 10 sec
+ flow service write delay interval: 500 ms
+ administrative yield duration: 30 sec
+ bored yield duration: 10 millis
+ max concurrent threads: 1
+FlowFile Repository:
+ partitions: 256
+ checkpoint interval: 2 mins
+ always sync: false
+ Swap:
+ threshold: 20000
+ in period: 5 sec
+ in threads: 1
+ out period: 5 sec
+ out threads: 4
+Content Repository:
+ content claim max appendable size: 10 MB
+ content claim max flow files: 100
+ always sync: false
+Provenance Repository:
+ provenance rollover time: 1 min
+Component Status Repository:
+ buffer size: 1440
+ snapshot frequency: 1 min
+Security Properties:
+ keystore: ''
+ keystore type: ''
+ keystore password: ''
+ key password: ''
+ truststore: ''
+ truststore type: ''
+ truststore password: ''
+ ssl protocol: ''
+ Sensitive Props:
+ key:
+ algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL
+ provider: BC
+Processors:
+- name: GenerateFlowFile
+ id: 16a47794-5391-4ad2-0000-000000000000
+ class: org.apache.nifi.processors.standard.GenerateFlowFile
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 0 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list: []
+ Properties:
+ Batch Size: '100'
+ Data Format: Binary
+ File Size: 0 KB
+ Unique FlowFiles: 'false'
+- name: GenerateFlowFile
+ id: 4ad21391-16a4-1794-0000-000000000000
+ class: org.apache.nifi.processors.standard.GenerateFlowFile
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 0 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list: []
+ Properties:
+ Batch Size: '100'
+ Data Format: Binary
+ File Size: 0 KB
+ Unique FlowFiles: 'false'
+- name: GenerateFlowFile
+ id: 53914ad2-7794-16a4-0000-000000000000
+ class: org.apache.nifi.processors.standard.GenerateFlowFile
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 0 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list: []
+ Properties:
+ Batch Size: '100'
+ Data Format: Binary
+ File Size: 0 KB
+ Unique FlowFiles: 'false'
+- name: GenerateFlowFile
+ id: 779416a4-4ad2-1391-0000-000000000000
+ class: org.apache.nifi.processors.standard.GenerateFlowFile
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 0 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list: []
+ Properties:
+ Batch Size: '100'
+ Data Format: Binary
+ File Size: 0 KB
+ Unique FlowFiles: 'false'
+- name: RouteOnAttribute
+ id: 397a4910-cc01-4c6b-0000-000000000000
+ class: org.apache.nifi.processors.standard.RouteOnAttribute
+ max concurrent tasks: 2
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 0 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 50000
+ auto-terminated relationships list:
+ - unmatched
+ Properties:
+ Routing Strategy: Route to Property name
+- name: UpdateAttribute
+ id: 92557c76-f251-45a4-0000-000000000000
+ class: org.apache.nifi.processors.attributes.UpdateAttribute
+ max concurrent tasks: 2
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 0 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 100000
+ auto-terminated relationships list: []
+ Properties:
+ Delete Attributes Expression:
+ property 1: value 1
+ property 2: value 2 ${nextInt()}
+Process Groups: []
+Input Ports: []
+Output Ports: []
+Funnels:
+- id: 4ad21392-16a4-1794-0000-000000000000
+Connections:
+- name: 4ad21392-16a4-1794-0000-000000000000//UpdateAttribute
+ id: 4ad21397-16a4-1794-0000-000000000000
+ source id: 4ad21392-16a4-1794-0000-000000000000
+ source relationship names: []
+ destination id: 92557c76-f251-45a4-0000-000000000000
+ max work queue size: 10000
+ max work queue data size: 1 GB
+ flowfile expiration: 0 sec
+ queue prioritizer class: ''
+- name: GenerateFlowFile/success/4ad21392-16a4-1794-0000-000000000000
+ id: 4ad21393-16a4-1794-0000-000000000000
+ source id: 16a47794-5391-4ad2-0000-000000000000
+ source relationship names:
+ - success
+ destination id: 4ad21392-16a4-1794-0000-000000000000
+ max work queue size: 10000
+ max work queue data size: 1 GB
+ flowfile expiration: 0 sec
+ queue prioritizer class: ''
+- name: GenerateFlowFile/success/4ad21392-16a4-1794-0000-000000000000
+ id: 4ad21394-16a4-1794-0000-000000000000
+ source id: 53914ad2-7794-16a4-0000-000000000000
+ source relationship names:
+ - success
+ destination id: 4ad21392-16a4-1794-0000-000000000000
+ max work queue size: 10000
+ max work queue data size: 1 GB
+ flowfile expiration: 0 sec
+ queue prioritizer class: ''
+- name: GenerateFlowFile/success/4ad21392-16a4-1794-0000-000000000000
+ id: 4ad21395-16a4-1794-0000-000000000000
+ source id: 779416a4-4ad2-1391-0000-000000000000
+ source relationship names:
+ - success
+ destination id: 4ad21392-16a4-1794-0000-000000000000
+ max work queue size: 10000
+ max work queue data size: 1 GB
+ flowfile expiration: 0 sec
+ queue prioritizer class: ''
+- name: GenerateFlowFile/success/4ad21392-16a4-1794-0000-000000000000
+ id: 4ad21396-16a4-1794-0000-000000000000
+ source id: 4ad21391-16a4-1794-0000-000000000000
+ source relationship names:
+ - success
+ destination id: 4ad21392-16a4-1794-0000-000000000000
+ max work queue size: 10000
+ max work queue data size: 1 GB
+ flowfile expiration: 0 sec
+ queue prioritizer class: ''
+- name: UpdateAttribute/success/RouteOnAttribute
+ id: 4c53556e-eb46-458c-0000-000000000000
+ source id: 92557c76-f251-45a4-0000-000000000000
+ source relationship names:
+ - success
+ destination id: 397a4910-cc01-4c6b-0000-000000000000
+ max work queue size: 2000
+ max work queue data size: 0 MB
+ flowfile expiration: 0 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer
+Remote Processing Groups: []
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
index abd6a6c..333adf8 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
@@ -53,6 +53,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
public static final String HAS_INVALID_DESTINATION_ID = " has invalid destination id ";
public static final String FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_IDS = "Found the following duplicate processor ids: ";
public static final String FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS = "Found the following duplicate connection ids: ";
+ public static final String FOUND_THE_FOLLOWING_DUPLICATE_FUNNEL_IDS = "Found the following duplicate funnel ids: ";
public static final String FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES = "Found the following duplicate remote processing group names: ";
public static String TOP_LEVEL_NAME = "top level";
private FlowControllerSchema flowControllerProperties;
@@ -100,6 +101,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
List<RemoteProcessingGroupSchema> allRemoteProcessingGroups = allProcessGroups.stream().flatMap(p -> p.getRemoteProcessingGroups().stream()).collect(Collectors.toList());
List<String> allProcessorIds = allProcessGroups.stream().flatMap(p -> p.getProcessors().stream()).map(ProcessorSchema::getId).collect(Collectors.toList());
+ List<String> allFunnelIds = allProcessGroups.stream().flatMap(p -> p.getFunnels().stream()).map(FunnelSchema::getId).collect(Collectors.toList());
List<String> allConnectionIds = allConnectionSchemas.stream().map(ConnectionSchema::getId).collect(Collectors.toList());
List<String> allRemoteProcessingGroupNames = allRemoteProcessingGroups.stream().map(RemoteProcessingGroupSchema::getName).collect(Collectors.toList());
List<String> allRemoteInputPortIds = allRemoteProcessingGroups.stream().filter(r -> r.getInputPorts() != null)
@@ -108,6 +110,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
List<String> allOutputPortIds = allProcessGroups.stream().flatMap(p -> p.getOutputPortSchemas().stream()).map(PortSchema::getId).collect(Collectors.toList());
checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_IDS, allProcessorIds);
+ checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_FUNNEL_IDS, allFunnelIds);
checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS, allConnectionIds);
checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES, allRemoteProcessingGroupNames);
checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_INPUT_PORT_IDS, allRemoteInputPortIds);
@@ -115,7 +118,8 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_OUTPUT_PORT_IDS, allOutputPortIds);
// Potential connection sources and destinations need to have unique ids
- OverlapResults<String> overlapResults = findOverlap(new HashSet<>(allProcessorIds), new HashSet<>(allRemoteInputPortIds), new HashSet<>(allInputPortIds), new HashSet<>(allOutputPortIds));
+ OverlapResults<String> overlapResults = findOverlap(new HashSet<>(allProcessorIds), new HashSet<>(allRemoteInputPortIds), new HashSet<>(allInputPortIds), new HashSet<>(allOutputPortIds),
+ new HashSet<>(allFunnelIds));
if (overlapResults.duplicates.size() > 0) {
addValidationIssue(FOUND_THE_FOLLOWING_DUPLICATE_IDS + overlapResults.duplicates.stream().sorted().collect(Collectors.joining(", ")));
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FunnelSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FunnelSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FunnelSchema.java
new file mode 100644
index 0000000..ee0b31b
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FunnelSchema.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.nifi.minifi.commons.schema;
+
+import org.apache.nifi.minifi.commons.schema.common.BaseSchemaWithId;
+
+import java.util.Map;
+
+public class FunnelSchema extends BaseSchemaWithId {
+ public FunnelSchema(Map map) {
+ super(map, "Funnel(id: {id})");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchema.java
index 88688fc..7be69c0 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchema.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchema.java
@@ -28,8 +28,10 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CONNECTIONS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.FUNNELS_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.ID_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.INPUT_PORTS_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NAME_KEY;
@@ -44,6 +46,7 @@ public class ProcessGroupSchema extends BaseSchemaWithIdAndName implements Writa
private String comment;
private List<ProcessorSchema> processors;
+ private List<FunnelSchema> funnels;
private List<ConnectionSchema> connections;
private List<RemoteProcessingGroupSchema> remoteProcessingGroups;
private List<ProcessGroupSchema> processGroupSchemas;
@@ -54,6 +57,7 @@ public class ProcessGroupSchema extends BaseSchemaWithIdAndName implements Writa
super(map, wrapperName);
processors = getOptionalKeyAsList(map, PROCESSORS_KEY, ProcessorSchema::new, wrapperName);
+ funnels = getOptionalKeyAsList(map, FUNNELS_KEY, FunnelSchema::new, wrapperName);
remoteProcessingGroups = getOptionalKeyAsList(map, REMOTE_PROCESSING_GROUPS_KEY, RemoteProcessingGroupSchema::new, wrapperName);
connections = getOptionalKeyAsList(map, CONNECTIONS_KEY, ConnectionSchema::new, wrapperName);
inputPortSchemas = getOptionalKeyAsList(map, INPUT_PORTS_KEY, m -> new PortSchema(m, "InputPort(id: {id}, name: {name})"), wrapperName);
@@ -74,9 +78,14 @@ public class ProcessGroupSchema extends BaseSchemaWithIdAndName implements Writa
Set<String> portIds = getPortIds();
connections.stream().filter(c -> portIds.contains(c.getSourceId())).forEachOrdered(c -> c.setNeedsSourceRelationships(false));
+
+ Set<String> funnelIds = new HashSet<>(funnels.stream().map(FunnelSchema::getId).collect(Collectors.toList()));
+ connections.stream().filter(c -> funnelIds.contains(c.getSourceId())).forEachOrdered(c -> c.setNeedsSourceRelationships(false));
+
addIssuesIfNotNull(processors);
addIssuesIfNotNull(remoteProcessingGroups);
addIssuesIfNotNull(processGroupSchemas);
+ addIssuesIfNotNull(funnels);
addIssuesIfNotNull(connections);
}
@@ -91,6 +100,7 @@ public class ProcessGroupSchema extends BaseSchemaWithIdAndName implements Writa
putListIfNotNull(result, PROCESS_GROUPS_KEY, processGroupSchemas);
putListIfNotNull(result, INPUT_PORTS_KEY, inputPortSchemas);
putListIfNotNull(result, OUTPUT_PORTS_KEY, outputPortSchemas);
+ putListIfNotNull(result, FUNNELS_KEY, funnels);
putListIfNotNull(result, CONNECTIONS_KEY, connections);
putListIfNotNull(result, REMOTE_PROCESSING_GROUPS_KEY, remoteProcessingGroups);
return result;
@@ -100,6 +110,10 @@ public class ProcessGroupSchema extends BaseSchemaWithIdAndName implements Writa
return processors;
}
+ public List<FunnelSchema> getFunnels() {
+ return funnels;
+ }
+
public List<ConnectionSchema> getConnections() {
return connections;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithId.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithId.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithId.java
new file mode 100644
index 0000000..96b47a7
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithId.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.nifi.minifi.commons.schema.common;
+
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.ID_KEY;
+
+public class BaseSchemaWithId extends BaseSchema implements WritableSchema {
+ public static final Pattern VALID_ID_PATTERN = Pattern.compile("[A-Za-z0-9_-]+");
+ public static final String ID_DOES_NOT_MATCH_VALID_ID_PATTERN = "Id does not match valid pattern (" + VALID_ID_PATTERN + "): ";
+
+ private final String wrapperName;
+ private String id;
+
+ public BaseSchemaWithId(Map map, String wrapperName) {
+ id = getId(map, wrapperName);
+ this.wrapperName = wrapperName;
+ }
+
+ protected String getId(Map map, String wrapperName) {
+ return getOptionalKeyAsType(map, ID_KEY, String.class, wrapperName, "");
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public String getWrapperName() {
+ return wrapperName.replace("{id}", StringUtil.isNullOrEmpty(id) ? "unkown" : id);
+ }
+
+ @Override
+ public Map<String, Object> toMap() {
+ Map<String, Object> map = mapSupplier.get();
+ map.put(ID_KEY, id);
+ return map;
+ }
+
+ @Override
+ public List<String> getValidationIssues() {
+ List<String> validationIssues = super.getValidationIssues();
+ if (StringUtil.isNullOrEmpty(id)) {
+ validationIssues.add(getIssueText(CommonPropertyKeys.ID_KEY, getWrapperName(), IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED));
+ } else if (!VALID_ID_PATTERN.matcher(id).matches()) {
+ validationIssues.add(getIssueText(CommonPropertyKeys.ID_KEY, getWrapperName(), ID_DOES_NOT_MATCH_VALID_ID_PATTERN));
+ }
+ return validationIssues;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java
index a1f7bb5..71a4f41 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java
@@ -19,39 +19,18 @@
package org.apache.nifi.minifi.commons.schema.common;
-import java.util.List;
import java.util.Map;
-import java.util.regex.Pattern;
-import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.ID_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NAME_KEY;
-public abstract class BaseSchemaWithIdAndName extends BaseSchema implements WritableSchema {
- public static final Pattern VALID_ID_PATTERN = Pattern.compile("[A-Za-z0-9_-]+");
- public static final String ID_DOES_NOT_MATCH_VALID_ID_PATTERN = "Id does not match valid pattern (" + VALID_ID_PATTERN + ")";
-
- private final String wrapperName;
- private String id;
+public abstract class BaseSchemaWithIdAndName extends BaseSchemaWithId implements WritableSchema {
private String name;
public BaseSchemaWithIdAndName(Map map, String wrapperName) {
- this.wrapperName = wrapperName;
- id = getId(map, getWrapperName());
+ super(map, wrapperName);
name = getOptionalKeyAsType(map, NAME_KEY, String.class, getWrapperName(), "");
}
- protected String getId(Map map, String wrapperName) {
- return getOptionalKeyAsType(map, ID_KEY, String.class, wrapperName, "");
- }
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
public String getName() {
return name;
}
@@ -61,25 +40,13 @@ public abstract class BaseSchemaWithIdAndName extends BaseSchema implements Writ
}
public String getWrapperName() {
- return wrapperName.replace("{id}", StringUtil.isNullOrEmpty(id) ? "unkown" : id).replace("{name}", StringUtil.isNullOrEmpty(name) ? "unkown" : name);
+ return super.getWrapperName().replace("{name}", StringUtil.isNullOrEmpty(name) ? "unkown" : name);
}
@Override
public Map<String, Object> toMap() {
- Map<String, Object> map = mapSupplier.get();
- map.put(ID_KEY, id);
+ Map<String, Object> map = super.toMap();
map.put(NAME_KEY, name);
return map;
}
-
- @Override
- public List<String> getValidationIssues() {
- List<String> validationIssues = super.getValidationIssues();
- if (StringUtil.isNullOrEmpty(id)) {
- validationIssues.add(getIssueText(CommonPropertyKeys.ID_KEY, getWrapperName(), IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED));
- } else if (!VALID_ID_PATTERN.matcher(id).matches()) {
- validationIssues.add(getIssueText(CommonPropertyKeys.ID_KEY, getWrapperName(), ID_DOES_NOT_MATCH_VALID_ID_PATTERN));
- }
- return validationIssues;
- }
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java
index a603f3e..dcf0500 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java
@@ -32,6 +32,7 @@ public class CommonPropertyKeys {
public static final String REMOTE_PROCESSING_GROUPS_KEY = "Remote Processing Groups";
public static final String INPUT_PORTS_KEY = "Input Ports";
public static final String OUTPUT_PORTS_KEY = "Output Ports";
+ public static final String FUNNELS_KEY = "Funnels";
public static final String PROVENANCE_REPO_KEY = "Provenance Repository";
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-docs/src/main/markdown/System_Admin_Guide.md
----------------------------------------------------------------------
diff --git a/minifi-docs/src/main/markdown/System_Admin_Guide.md b/minifi-docs/src/main/markdown/System_Admin_Guide.md
index 0836443..f672074 100644
--- a/minifi-docs/src/main/markdown/System_Admin_Guide.md
+++ b/minifi-docs/src/main/markdown/System_Admin_Guide.md
@@ -404,6 +404,7 @@ Remote Processing Groups | The remote processing groups contained in
Connections | The connections contained in this Process Group. (Defined below)
Input Ports | The input ports contained in this Process Group. (Defined below)
Output Ports | The output ports contained in this Process Group. (Defined below)
+Funnels | The funnels contained in this Process Group. (Defined below)
Process Groups | The child Process Groups contained in this Process Group.
## Input Ports
@@ -424,6 +425,14 @@ These ports provide output from the Process Group they reside on. (Currently onl
name | The name of what this output port will do.
id | The id of this output port. This needs to be set to a unique filesystem-friendly value (regex: [A-Za-z0-9_-]+)
+## Funnels
+
+Funnels can be used to combine outputs from multiple processors into a single connection for ease of design.
+
+*Property* | *Description*
+-------------------- | -------------
+id | The id of this funnel. This needs to be set to a unique filesystem-friendly value (regex: [A-Za-z0-9_-]+)
+
## Connections
There can be multiple connections in this version of MiNiFi. The "Connections" subsection is a list of connections. Each connection must specify these properties.
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/config.yml
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/config.yml b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/config.yml
index c88e47e..a6d13f6 100644
--- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/config.yml
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/config.yml
@@ -59,5 +59,6 @@ Processors: []
Process Groups: []
Input Ports: []
Output Ports: []
+Funnels: []
Connections: []
Remote Processing Groups: []
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMain.java
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMain.java b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMain.java
index e62392d..b726c5a 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMain.java
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMain.java
@@ -63,7 +63,6 @@ public class ConfigMain {
public static final int ERR_UNABLE_TO_OPEN_OUTPUT = 2;
public static final int ERR_UNABLE_TO_OPEN_INPUT = 3;
public static final int ERR_UNABLE_TO_READ_TEMPLATE = 4;
- public static final int ERR_UNABLE_TO_TRANSFORM_TEMPLATE = 5;
public static final int ERR_UNABLE_TO_PARSE_CONFIG = 6;
public static final int ERR_INVALID_CONFIG = 7;
public static final int ERR_UNABLE_TO_CLOSE_CONFIG = 8;
@@ -171,7 +170,8 @@ public class ConfigMain {
StringBuilder name = new StringBuilder();
ConnectableDTO connectionSource = connection.getSource();
if (connectionSource != null) {
- name.append(connectionSource.getName());
+ String connectionSourceName = connectionSource.getName();
+ name.append(StringUtil.isNullOrEmpty(connectionSourceName) ? connectionSource.getId() : connectionSourceName);
}
name.append("/");
if (connection.getSelectedRelationships() != null && connection.getSelectedRelationships().size() > 0) {
@@ -180,7 +180,8 @@ public class ConfigMain {
name.append("/");
ConnectableDTO connectionDestination = connection.getDestination();
if (connectionDestination != null) {
- name.append(connectionDestination.getName());
+ String connectionDestinationName = connectionDestination.getName();
+ name.append(StringUtil.isNullOrEmpty(connectionDestinationName) ? connectionDestination.getId() : connectionDestinationName);
}
connection.setName(name.toString());
}
@@ -203,14 +204,9 @@ public class ConfigMain {
nullToEmpty(flowSnippetDTO.getProcessGroups()).stream().map(ProcessGroupDTO::getContents).forEach(f -> getAllFlowSnippets(f, result));
}
- public static ConfigSchema transformTemplateToSchema(InputStream source) throws JAXBException, IOException, SchemaLoaderException {
+ public static ConfigSchema transformTemplateToSchema(InputStream source) throws JAXBException, IOException {
try {
TemplateDTO templateDTO = (TemplateDTO) JAXBContext.newInstance(TemplateDTO.class).createUnmarshaller().unmarshal(source);
-
- if (templateDTO.getSnippet().getFunnels().size() != 0){
- throw new SchemaLoaderException("Funnels are not currently supported in MiNiFi. Please remove any from the template and try again.");
- }
-
enrichFlowSnippetDTO(templateDTO.getSnippet());
ConfigSchema configSchema = new ConfigSchemaFunction().apply(templateDTO);
return configSchema;
@@ -329,10 +325,6 @@ public class ConfigMain {
System.out.println();
printTransformUsage();
return ERR_UNABLE_TO_READ_TEMPLATE;
- } catch (SchemaLoaderException e) {
- System.out.println("Error transforming template to YAML. (" + e.getMessage() + ")");
- System.out.println();
- return ERR_UNABLE_TO_TRANSFORM_TEMPLATE;
}
} catch (FileNotFoundException e) {
return handleErrorOpeningInput(args[1], ConfigMain::printTransformUsage, e);
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConfigSchemaFunction.java
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConfigSchemaFunction.java b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConfigSchemaFunction.java
index 9cdccf5..d6aaabf 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConfigSchemaFunction.java
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConfigSchemaFunction.java
@@ -19,10 +19,12 @@ package org.apache.nifi.minifi.toolkit.configuration.dto;
import org.apache.nifi.minifi.commons.schema.ConfigSchema;
import org.apache.nifi.minifi.commons.schema.ConnectionSchema;
+import org.apache.nifi.minifi.commons.schema.FunnelSchema;
import org.apache.nifi.minifi.commons.schema.PortSchema;
import org.apache.nifi.minifi.commons.schema.ProcessGroupSchema;
import org.apache.nifi.minifi.commons.schema.ProcessorSchema;
import org.apache.nifi.minifi.commons.schema.RemoteProcessingGroupSchema;
+import org.apache.nifi.minifi.commons.schema.common.CollectionUtil;
import org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys;
import org.apache.nifi.minifi.commons.schema.common.StringUtil;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
@@ -44,20 +46,23 @@ public class ConfigSchemaFunction implements Function<TemplateDTO, ConfigSchema>
private final FlowControllerSchemaFunction flowControllerSchemaFunction;
private final ProcessorSchemaFunction processorSchemaFunction;
private final ConnectionSchemaFunction connectionSchemaFunction;
+ private final FunnelSchemaFunction funnelSchemaFunction;
private final RemoteProcessingGroupSchemaFunction remoteProcessingGroupSchemaFunction;
private final PortSchemaFunction inputPortSchemaFunction;
private final PortSchemaFunction outputPortSchemaFunction;
public ConfigSchemaFunction() {
- this(new FlowControllerSchemaFunction(), new ProcessorSchemaFunction(), new ConnectionSchemaFunction(), new RemoteProcessingGroupSchemaFunction(new RemoteInputPortSchemaFunction()),
- new PortSchemaFunction(INPUT_PORTS_KEY), new PortSchemaFunction(OUTPUT_PORTS_KEY));
+ this(new FlowControllerSchemaFunction(), new ProcessorSchemaFunction(), new ConnectionSchemaFunction(), new FunnelSchemaFunction(), new RemoteProcessingGroupSchemaFunction(
+ new RemoteInputPortSchemaFunction()), new PortSchemaFunction(INPUT_PORTS_KEY), new PortSchemaFunction(OUTPUT_PORTS_KEY));
}
public ConfigSchemaFunction(FlowControllerSchemaFunction flowControllerSchemaFunction, ProcessorSchemaFunction processorSchemaFunction, ConnectionSchemaFunction connectionSchemaFunction,
- RemoteProcessingGroupSchemaFunction remoteProcessingGroupSchemaFunction, PortSchemaFunction inputPortSchemaFunction, PortSchemaFunction outputPortSchemaFunction) {
+ FunnelSchemaFunction funnelSchemaFunction, RemoteProcessingGroupSchemaFunction remoteProcessingGroupSchemaFunction, PortSchemaFunction inputPortSchemaFunction,
+ PortSchemaFunction outputPortSchemaFunction) {
this.flowControllerSchemaFunction = flowControllerSchemaFunction;
this.processorSchemaFunction = processorSchemaFunction;
this.connectionSchemaFunction = connectionSchemaFunction;
+ this.funnelSchemaFunction = funnelSchemaFunction;
this.remoteProcessingGroupSchemaFunction = remoteProcessingGroupSchemaFunction;
this.inputPortSchemaFunction = inputPortSchemaFunction;
this.outputPortSchemaFunction = outputPortSchemaFunction;
@@ -95,14 +100,18 @@ public class ConfigSchemaFunction implements Function<TemplateDTO, ConfigSchema>
.map(ProcessorSchema::toMap)
.collect(Collectors.toList()));
-
-
map.put(CommonPropertyKeys.CONNECTIONS_KEY, nullToEmpty(snippet.getConnections()).stream()
.map(connectionSchemaFunction)
.sorted(Comparator.comparing(ConnectionSchema::getName))
.map(ConnectionSchema::toMap)
.collect(Collectors.toList()));
+ map.put(CommonPropertyKeys.FUNNELS_KEY, CollectionUtil.nullToEmpty(snippet.getFunnels()).stream()
+ .map(funnelSchemaFunction)
+ .sorted(Comparator.comparing(FunnelSchema::getId))
+ .map(FunnelSchema::toMap)
+ .collect(Collectors.toList()));
+
map.put(CommonPropertyKeys.REMOTE_PROCESSING_GROUPS_KEY, nullToEmpty(snippet.getRemoteProcessGroups()).stream()
.map(remoteProcessingGroupSchemaFunction)
.sorted(Comparator.comparing(RemoteProcessingGroupSchema::getName))
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/FunnelSchemaFunction.java
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/FunnelSchemaFunction.java b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/FunnelSchemaFunction.java
new file mode 100644
index 0000000..a03b105
--- /dev/null
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/FunnelSchemaFunction.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.nifi.minifi.toolkit.configuration.dto;
+
+import org.apache.nifi.minifi.commons.schema.FunnelSchema;
+import org.apache.nifi.web.api.dto.FunnelDTO;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.ID_KEY;
+
+public class FunnelSchemaFunction implements Function<FunnelDTO, FunnelSchema> {
+ @Override
+ public FunnelSchema apply(FunnelDTO funnelDTO) {
+ Map<String, Object> map = new HashMap<>();
+ map.put(ID_KEY, funnelDTO.getId());
+ return new FunnelSchema(map);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java b/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java
index d61a641..9f7559e 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java
@@ -18,6 +18,7 @@
package org.apache.nifi.minifi.toolkit.configuration;
import org.apache.commons.io.Charsets;
+import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.minifi.commons.schema.ConfigSchema;
import org.apache.nifi.minifi.commons.schema.ConnectionSchema;
import org.apache.nifi.minifi.commons.schema.ProcessorSchema;
@@ -148,8 +149,8 @@ public class ConfigMainTest {
@Test
public void testTransformErrorTransformingTemplate() throws FileNotFoundException {
when(pathInputStreamFactory.create(testInput)).thenAnswer(invocation ->
- ConfigMainTest.class.getClassLoader().getResourceAsStream("TemplateWithFunnel.xml"));
- assertEquals(ConfigMain.ERR_UNABLE_TO_TRANSFORM_TEMPLATE, configMain.execute(new String[]{ConfigMain.TRANSFORM, testInput, testOutput}));
+ new LimitedInputStream(ConfigMainTest.class.getClassLoader().getResourceAsStream("TemplateWithFunnel.xml"), 25));
+ assertEquals(ConfigMain.ERR_UNABLE_TO_READ_TEMPLATE, configMain.execute(new String[]{ConfigMain.TRANSFORM, testInput, testOutput}));
}
@Test
@@ -186,6 +187,11 @@ public class ConfigMainTest {
}
@Test
+ public void testTransformRoundTripStressTestFrameworkFunnel() throws IOException, JAXBException, SchemaLoaderException {
+ transformRoundTrip("StressTestFrameworkFunnel");
+ }
+
+ @Test
public void testTransformRoundTripMultipleRelationships() throws IOException, JAXBException, SchemaLoaderException {
transformRoundTrip("MultipleRelationships");
}
@@ -210,8 +216,8 @@ public class ConfigMainTest {
ConfigMain.transformTemplateToSchema(getClass().getClassLoader().getResourceAsStream("TemplateWithInputPort.xml")).toMap();
}
- @Test(expected = SchemaLoaderException.class)
- public void testFailToTransformFunnel() throws IOException, JAXBException, SchemaLoaderException {
+ @Test
+ public void testSuccessTransformFunnel() throws IOException, JAXBException, SchemaLoaderException {
ConfigMain.transformTemplateToSchema(getClass().getClassLoader().getResourceAsStream("TemplateWithFunnel.xml")).toMap();
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/CsvToJson.yml
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/CsvToJson.yml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/CsvToJson.yml
index d03bd16..3707335 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/CsvToJson.yml
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/CsvToJson.yml
@@ -151,6 +151,7 @@ Processors:
Process Groups: []
Input Ports: []
Output Ports: []
+Funnels: []
Connections:
- name: ExtractText/matched/ReplaceText2
id: 56ef3e2e-ee35-4598-9fbe-ae86050960b0
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/DecompressionCircularFlow.yml
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/DecompressionCircularFlow.yml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/DecompressionCircularFlow.yml
index d2f90b2..c27fb62 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/DecompressionCircularFlow.yml
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/DecompressionCircularFlow.yml
@@ -190,6 +190,7 @@ Processors:
Process Groups: []
Input Ports: []
Output Ports: []
+Funnels: []
Connections:
- name: Compressed?/gzip/Uncompress GZIP
id: 5de215d5-9f7e-414b-98aa-2edaa0514d99
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/InvokeHttpMiNiFiTemplateTest.yml
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/InvokeHttpMiNiFiTemplateTest.yml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/InvokeHttpMiNiFiTemplateTest.yml
index aa7d6d5..b518f28 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/InvokeHttpMiNiFiTemplateTest.yml
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/InvokeHttpMiNiFiTemplateTest.yml
@@ -169,6 +169,7 @@ Processors:
Process Groups: []
Input Ports: []
Output Ports: []
+Funnels: []
Connections:
- name: Route On Status Code/200/LogAttribute
id: 3039718a-bb40-4811-9b74-ecbe926daae8
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/MultipleRelationships.yml
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/MultipleRelationships.yml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/MultipleRelationships.yml
index 12ed7e1..4e3b479 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/MultipleRelationships.yml
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/MultipleRelationships.yml
@@ -117,6 +117,7 @@ Processors:
Process Groups: []
Input Ports: []
Output Ports: []
+Funnels: []
Connections:
- name: GenerateFlowFile/success/UpdateAttribute
id: 7c79cce3-0157-1000-0000-000000000000
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ProcessGroupsAndRemoteProcessGroups.yml
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ProcessGroupsAndRemoteProcessGroups.yml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ProcessGroupsAndRemoteProcessGroups.yml
index 101a360..7783c53 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ProcessGroupsAndRemoteProcessGroups.yml
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ProcessGroupsAndRemoteProcessGroups.yml
@@ -142,6 +142,7 @@ Process Groups:
Output Ports:
- id: 207a6d92-0158-1000-0000-000000000000
name: output
+ Funnels: []
Connections:
- id: 21a6abb9-0158-1000-0000-000000000000
name: UpdateAttribute/success/21a39aba-0158-1000-a1a0-1b55bcddcd72
@@ -190,6 +191,7 @@ Process Groups:
Output Ports:
- id: 2079b327-0158-1000-0000-000000000000
name: output
+ Funnels: []
Connections:
- id: 21a5b1f1-0158-1000-0000-000000000000
name: UpdateAttribute/success/21a2fb5e-0158-1000-3b5e-5a7d3aaee01b
@@ -243,6 +245,7 @@ Process Groups:
use compression: false
Input Ports: []
Output Ports: []
+Funnels: []
Connections:
- id: 2077bf8f-0158-1000-0000-000000000000
name: GenerateFlowFile/success/UpdateAttribute
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ReplaceTextExpressionLanguageCSVReformatting.yml
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ReplaceTextExpressionLanguageCSVReformatting.yml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ReplaceTextExpressionLanguageCSVReformatting.yml
index 36bdcd8..64a9108 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ReplaceTextExpressionLanguageCSVReformatting.yml
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ReplaceTextExpressionLanguageCSVReformatting.yml
@@ -126,6 +126,7 @@ Processors:
Process Groups: []
Input Ports: []
Output Ports: []
+Funnels: []
Connections:
- name: Generate Empty File/success/Set CSV Content
id: ca71a875-0ff5-41ef-bbe0-da5de0ca1e08
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/StressTestFramework.yml
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/StressTestFramework.yml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/StressTestFramework.yml
index d135be8..2df4526 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/StressTestFramework.yml
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/StressTestFramework.yml
@@ -101,6 +101,7 @@ Processors:
Process Groups: []
Input Ports: []
Output Ports: []
+Funnels: []
Connections:
- name: GenerateFlowFile/success/UpdateAttribute
id: 0e6873cc-cb9d-4e98-92aa-a3319f4c1b02