You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/11/09 21:49:14 UTC

[2/2] nifi-minifi git commit: MINIFI-47 - Funnel support

MINIFI-47 - Funnel support

This closes #52

Signed-off-by: Joseph Percivall <JP...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi/commit/12a58a86
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi/tree/12a58a86
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi/diff/12a58a86

Branch: refs/heads/master
Commit: 12a58a8698c3f661d14156c83c8ea0de2f7a74dd
Parents: 31855bb
Author: Bryan Rosander <br...@apache.org>
Authored: Thu Nov 3 11:14:56 2016 -0400
Committer: Joseph Percivall <JP...@apache.org>
Committed: Wed Nov 9 16:48:48 2016 -0500

----------------------------------------------------------------------
 .../bootstrap/util/ConfigTransformer.java       |  25 +-
 .../bootstrap/util/ParentGroupIdResolver.java   |  12 +-
 .../bootstrap/util/ConfigTransformerTest.java   |  22 +-
 .../util/ParentGroupIdResolverTest.java         |  31 ++
 .../resources/stress-test-framework-funnel.yml  | 211 ++++++++
 .../minifi/commons/schema/ConfigSchema.java     |   6 +-
 .../minifi/commons/schema/FunnelSchema.java     |  30 ++
 .../commons/schema/ProcessGroupSchema.java      |  14 +
 .../commons/schema/common/BaseSchemaWithId.java |  73 +++
 .../schema/common/BaseSchemaWithIdAndName.java  |  41 +-
 .../schema/common/CommonPropertyKeys.java       |   1 +
 .../src/main/markdown/System_Admin_Guide.md     |   9 +
 .../src/main/resources/conf/config.yml          |   1 +
 .../toolkit/configuration/ConfigMain.java       |  18 +-
 .../configuration/dto/ConfigSchemaFunction.java |  19 +-
 .../configuration/dto/FunnelSchemaFunction.java |  38 ++
 .../toolkit/configuration/ConfigMainTest.java   |  14 +-
 .../src/test/resources/CsvToJson.yml            |   1 +
 .../resources/DecompressionCircularFlow.yml     |   1 +
 .../resources/InvokeHttpMiNiFiTemplateTest.yml  |   1 +
 .../test/resources/MultipleRelationships.yml    |   1 +
 .../ProcessGroupsAndRemoteProcessGroups.yml     |   3 +
 ...aceTextExpressionLanguageCSVReformatting.yml |   1 +
 .../src/test/resources/StressTestFramework.yml  |   1 +
 .../resources/StressTestFrameworkFunnel.xml     | 540 +++++++++++++++++++
 .../resources/StressTestFrameworkFunnel.yml     | 211 ++++++++
 26 files changed, 1259 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
index 9fa7f05..19d3268 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
@@ -27,6 +27,7 @@ import org.apache.nifi.minifi.commons.schema.ContentRepositorySchema;
 import org.apache.nifi.minifi.commons.schema.CorePropertiesSchema;
 import org.apache.nifi.minifi.commons.schema.FlowControllerSchema;
 import org.apache.nifi.minifi.commons.schema.FlowFileRepositorySchema;
+import org.apache.nifi.minifi.commons.schema.FunnelSchema;
 import org.apache.nifi.minifi.commons.schema.PortSchema;
 import org.apache.nifi.minifi.commons.schema.ProcessGroupSchema;
 import org.apache.nifi.minifi.commons.schema.ProcessorSchema;
@@ -369,6 +370,10 @@ public final class ConfigTransformer {
                 addRemoteProcessGroup(element, remoteProcessingGroupSchema);
             }
 
+            for (FunnelSchema funnelSchema : processGroupSchema.getFunnels()) {
+                addFunnel(element, funnelSchema);
+            }
+
             for (PortSchema portSchema : processGroupSchema.getInputPortSchemas()) {
                 addPort(doc, element, portSchema, "inputPort");
             }
@@ -448,6 +453,17 @@ public final class ConfigTransformer {
         }
     }
 
+
+    protected static void addFunnel(final Element parentElement, FunnelSchema funnelSchema) {
+        Document document = parentElement.getOwnerDocument();
+        Element element = document.createElement("funnel");
+        parentElement.appendChild(element);
+
+        addTextElement(element, "id", funnelSchema.getId());
+
+        addPosition(element);
+    }
+
     protected static void addProvenanceReportingTask(final Element element, ConfigSchema configSchema) throws ConfigurationChangeException {
         try {
             ProvenanceReportingSchema provenanceProperties = configSchema.getProvenanceReportingProperties();
@@ -605,8 +621,13 @@ public final class ConfigTransformer {
                 if (parentId != null) {
                     type = "OUTPUT_PORT";
                 } else {
-                    parentId = parentGroupIdResolver.getProcessorParentId(id);
-                    type = "PROCESSOR";
+                    parentId = parentGroupIdResolver.getFunnelParentId(id);
+                    if (parentId != null) {
+                        type = "FUNNEL";
+                    } else {
+                        parentId = parentGroupIdResolver.getProcessorParentId(id);
+                        type = "PROCESSOR";
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolver.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolver.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolver.java
index 71088ee..05aa27d 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolver.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolver.java
@@ -22,7 +22,7 @@ package org.apache.nifi.minifi.bootstrap.util;
 import org.apache.nifi.minifi.commons.schema.ProcessGroupSchema;
 import org.apache.nifi.minifi.commons.schema.RemoteInputPortSchema;
 import org.apache.nifi.minifi.commons.schema.RemoteProcessingGroupSchema;
-import org.apache.nifi.minifi.commons.schema.common.BaseSchemaWithIdAndName;
+import org.apache.nifi.minifi.commons.schema.common.BaseSchemaWithId;
 
 import java.util.Collection;
 import java.util.HashMap;
@@ -33,23 +33,25 @@ public class ParentGroupIdResolver {
     private final Map<String, String> processorIdToParentIdMap;
     private final Map<String, String> inputPortIdToParentIdMap;
     private final Map<String, String> outputPortIdToParentIdMap;
+    private final Map<String, String> funnelIdToParentIdMap;
     private final Map<String, String> remoteInputPortIdToParentIdMap;
 
     public ParentGroupIdResolver(ProcessGroupSchema processGroupSchema) {
         this.processorIdToParentIdMap = getParentIdMap(processGroupSchema, ProcessGroupSchema::getProcessors);
         this.inputPortIdToParentIdMap = getParentIdMap(processGroupSchema, ProcessGroupSchema::getInputPortSchemas);
         this.outputPortIdToParentIdMap = getParentIdMap(processGroupSchema, ProcessGroupSchema::getOutputPortSchemas);
+        this.funnelIdToParentIdMap = getParentIdMap(processGroupSchema, ProcessGroupSchema::getFunnels);
         this.remoteInputPortIdToParentIdMap = getRemoteInputPortParentIdMap(processGroupSchema);
     }
 
-    protected static Map<String, String> getParentIdMap(ProcessGroupSchema processGroupSchema, Function<ProcessGroupSchema, Collection<? extends BaseSchemaWithIdAndName>> schemaAccessor) {
+    protected static Map<String, String> getParentIdMap(ProcessGroupSchema processGroupSchema, Function<ProcessGroupSchema, Collection<? extends BaseSchemaWithId>> schemaAccessor) {
         Map<String, String> map = new HashMap<>();
         getParentIdMap(processGroupSchema, map, schemaAccessor);
         return map;
     }
 
     protected static void getParentIdMap(ProcessGroupSchema processGroupSchema, Map<String, String> output, Function<ProcessGroupSchema,
-            Collection<? extends BaseSchemaWithIdAndName>> schemaAccessor) {
+            Collection<? extends BaseSchemaWithId>> schemaAccessor) {
         schemaAccessor.apply(processGroupSchema).forEach(p -> output.put(p.getId(), processGroupSchema.getId()));
         processGroupSchema.getProcessGroupSchemas().forEach(p -> getParentIdMap(p, output, schemaAccessor));
     }
@@ -84,4 +86,8 @@ public class ParentGroupIdResolver {
     public String getProcessorParentId(String id) {
         return processorIdToParentIdMap.get(id);
     }
+
+    public String getFunnelParentId(String id) {
+        return funnelIdToParentIdMap.get(id);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
index cac9d16..b41dc90 100644
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
@@ -20,6 +20,7 @@ package org.apache.nifi.minifi.bootstrap.util;
 import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
 import org.apache.nifi.minifi.commons.schema.ConfigSchema;
 import org.apache.nifi.minifi.commons.schema.ConnectionSchema;
+import org.apache.nifi.minifi.commons.schema.FunnelSchema;
 import org.apache.nifi.minifi.commons.schema.PortSchema;
 import org.apache.nifi.minifi.commons.schema.ProcessGroupSchema;
 import org.apache.nifi.minifi.commons.schema.ProcessorSchema;
@@ -100,7 +101,16 @@ public class ConfigTransformerTest {
 
     @Test
     public void testProcessGroupsTransform() throws Exception {
-        ConfigSchema configSchema = SchemaLoader.loadConfigSchemaFromYaml(ConfigTransformerTest.class.getClassLoader().getResourceAsStream("config-process-groups.yml"));
+        testConfigFileTransform("config-process-groups.yml");
+    }
+
+    @Test
+    public void testFunnelsTransform() throws Exception {
+        testConfigFileTransform("stress-test-framework-funnel.yml");
+    }
+
+    public void testConfigFileTransform(String configFile) throws Exception {
+        ConfigSchema configSchema = SchemaLoader.loadConfigSchemaFromYaml(ConfigTransformerTest.class.getClassLoader().getResourceAsStream(configFile));
 
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
         ConfigTransformer.writeFlowXmlFile(configSchema, outputStream);
@@ -126,6 +136,12 @@ public class ConfigTransformerTest {
             testRemoteProcessGroups((Element) remoteProcessGroupElements.item(i), processGroupSchema.getRemoteProcessingGroups().get(i));
         }
 
+        NodeList funnelElements = (NodeList) xPathFactory.newXPath().evaluate("funnel", element, XPathConstants.NODESET);
+        assertEquals(processGroupSchema.getFunnels().size(), funnelElements.getLength());
+        for (int i = 0; i < funnelElements.getLength(); i++) {
+            testFunnel((Element) funnelElements.item(i), processGroupSchema.getFunnels().get(i));
+        }
+
         NodeList inputPortElements = (NodeList) xPathFactory.newXPath().evaluate("inputPort", element, XPathConstants.NODESET);
         assertEquals(processGroupSchema.getInputPortSchemas().size(), inputPortElements.getLength());
         for (int i = 0; i < inputPortElements.getLength(); i++) {
@@ -195,6 +211,10 @@ public class ConfigTransformerTest {
         assertEquals("RUNNING", getText(element, "scheduledState"));
     }
 
+    private void testFunnel(Element element, FunnelSchema funnelSchema) throws XPathExpressionException {
+        assertEquals(funnelSchema.getId(), getText(element, "id"));
+    }
+
     private void testConnection(Element element, ConnectionSchema connectionSchema) throws XPathExpressionException {
         assertEquals(connectionSchema.getId(), getText(element, "id"));
         assertEquals(connectionSchema.getName(), getText(element, "name"));

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolverTest.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolverTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolverTest.java
index 6f557b7..c67d250 100644
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolverTest.java
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ParentGroupIdResolverTest.java
@@ -57,6 +57,8 @@ public class ParentGroupIdResolverTest {
         assertNull(parentGroupIdResolver.getOutputPortParentId("two"));
         assertNull(parentGroupIdResolver.getProcessorParentId("one"));
         assertNull(parentGroupIdResolver.getProcessorParentId("two"));
+        assertNull(parentGroupIdResolver.getFunnelParentId("one"));
+        assertNull(parentGroupIdResolver.getFunnelParentId("two"));
     }
 
     @Test
@@ -78,6 +80,8 @@ public class ParentGroupIdResolverTest {
         assertNull(parentGroupIdResolver.getOutputPortParentId("two"));
         assertNull(parentGroupIdResolver.getProcessorParentId("one"));
         assertNull(parentGroupIdResolver.getProcessorParentId("two"));
+        assertNull(parentGroupIdResolver.getFunnelParentId("one"));
+        assertNull(parentGroupIdResolver.getFunnelParentId("two"));
     }
 
     @Test
@@ -99,6 +103,8 @@ public class ParentGroupIdResolverTest {
         assertEquals("pgTwo", parentGroupIdResolver.getOutputPortParentId("two"));
         assertNull(parentGroupIdResolver.getProcessorParentId("one"));
         assertNull(parentGroupIdResolver.getProcessorParentId("two"));
+        assertNull(parentGroupIdResolver.getFunnelParentId("one"));
+        assertNull(parentGroupIdResolver.getFunnelParentId("two"));
     }
 
     @Test
@@ -120,6 +126,31 @@ public class ParentGroupIdResolverTest {
         assertNull(parentGroupIdResolver.getOutputPortParentId("two"));
         assertEquals(ConfigTransformer.ROOT_GROUP, parentGroupIdResolver.getProcessorParentId("one"));
         assertEquals("pgTwo", parentGroupIdResolver.getProcessorParentId("two"));
+        assertNull(parentGroupIdResolver.getFunnelParentId("one"));
+        assertNull(parentGroupIdResolver.getFunnelParentId("two"));
+    }
+
+    @Test
+    public void testFunnelParentId() throws IOException, SchemaLoaderException {
+        List<String> configLines = new ArrayList<>();
+        configLines.add("MiNiFi Config Version: 2");
+        configLines.add("Funnels:");
+        configLines.add("- id: one");
+        configLines.add("Process Groups:");
+        configLines.add("- id: pgTwo");
+        configLines.add("  Funnels:");
+        configLines.add("  - id: two");
+        ParentGroupIdResolver parentGroupIdResolver = createParentGroupIdResolver(configLines);
+        assertNull(parentGroupIdResolver.getRemoteInputPortParentId("one"));
+        assertNull(parentGroupIdResolver.getRemoteInputPortParentId("two"));
+        assertNull(parentGroupIdResolver.getInputPortParentId("one"));
+        assertNull(parentGroupIdResolver.getInputPortParentId("two"));
+        assertNull(parentGroupIdResolver.getOutputPortParentId("one"));
+        assertNull(parentGroupIdResolver.getOutputPortParentId("two"));
+        assertNull(parentGroupIdResolver.getProcessorParentId("one"));
+        assertNull(parentGroupIdResolver.getProcessorParentId("two"));
+        assertEquals(ConfigTransformer.ROOT_GROUP, parentGroupIdResolver.getFunnelParentId("one"));
+        assertEquals("pgTwo", parentGroupIdResolver.getFunnelParentId("two"));
     }
 
     private ParentGroupIdResolver createParentGroupIdResolver(List<String> configLines) throws IOException, SchemaLoaderException {

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-bootstrap/src/test/resources/stress-test-framework-funnel.yml
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/resources/stress-test-framework-funnel.yml b/minifi-bootstrap/src/test/resources/stress-test-framework-funnel.yml
new file mode 100644
index 0000000..bbbda3a
--- /dev/null
+++ b/minifi-bootstrap/src/test/resources/stress-test-framework-funnel.yml
@@ -0,0 +1,211 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+MiNiFi Config Version: 2
+Flow Controller:
+  name: StressTestFramework
+  comment: ''
+Core Properties:
+  flow controller graceful shutdown period: 10 sec
+  flow service write delay interval: 500 ms
+  administrative yield duration: 30 sec
+  bored yield duration: 10 millis
+  max concurrent threads: 1
+FlowFile Repository:
+  partitions: 256
+  checkpoint interval: 2 mins
+  always sync: false
+  Swap:
+    threshold: 20000
+    in period: 5 sec
+    in threads: 1
+    out period: 5 sec
+    out threads: 4
+Content Repository:
+  content claim max appendable size: 10 MB
+  content claim max flow files: 100
+  always sync: false
+Provenance Repository:
+  provenance rollover time: 1 min
+Component Status Repository:
+  buffer size: 1440
+  snapshot frequency: 1 min
+Security Properties:
+  keystore: ''
+  keystore type: ''
+  keystore password: ''
+  key password: ''
+  truststore: ''
+  truststore type: ''
+  truststore password: ''
+  ssl protocol: ''
+  Sensitive Props:
+    key:
+    algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL
+    provider: BC
+Processors:
+- name: GenerateFlowFile
+  id: 16a47794-5391-4ad2-0000-000000000000
+  class: org.apache.nifi.processors.standard.GenerateFlowFile
+  max concurrent tasks: 1
+  scheduling strategy: TIMER_DRIVEN
+  scheduling period: 0 sec
+  penalization period: 30 sec
+  yield period: 1 sec
+  run duration nanos: 0
+  auto-terminated relationships list: []
+  Properties:
+    Batch Size: '100'
+    Data Format: Binary
+    File Size: 0 KB
+    Unique FlowFiles: 'false'
+- name: GenerateFlowFile
+  id: 4ad21391-16a4-1794-0000-000000000000
+  class: org.apache.nifi.processors.standard.GenerateFlowFile
+  max concurrent tasks: 1
+  scheduling strategy: TIMER_DRIVEN
+  scheduling period: 0 sec
+  penalization period: 30 sec
+  yield period: 1 sec
+  run duration nanos: 0
+  auto-terminated relationships list: []
+  Properties:
+    Batch Size: '100'
+    Data Format: Binary
+    File Size: 0 KB
+    Unique FlowFiles: 'false'
+- name: GenerateFlowFile
+  id: 53914ad2-7794-16a4-0000-000000000000
+  class: org.apache.nifi.processors.standard.GenerateFlowFile
+  max concurrent tasks: 1
+  scheduling strategy: TIMER_DRIVEN
+  scheduling period: 0 sec
+  penalization period: 30 sec
+  yield period: 1 sec
+  run duration nanos: 0
+  auto-terminated relationships list: []
+  Properties:
+    Batch Size: '100'
+    Data Format: Binary
+    File Size: 0 KB
+    Unique FlowFiles: 'false'
+- name: GenerateFlowFile
+  id: 779416a4-4ad2-1391-0000-000000000000
+  class: org.apache.nifi.processors.standard.GenerateFlowFile
+  max concurrent tasks: 1
+  scheduling strategy: TIMER_DRIVEN
+  scheduling period: 0 sec
+  penalization period: 30 sec
+  yield period: 1 sec
+  run duration nanos: 0
+  auto-terminated relationships list: []
+  Properties:
+    Batch Size: '100'
+    Data Format: Binary
+    File Size: 0 KB
+    Unique FlowFiles: 'false'
+- name: RouteOnAttribute
+  id: 397a4910-cc01-4c6b-0000-000000000000
+  class: org.apache.nifi.processors.standard.RouteOnAttribute
+  max concurrent tasks: 2
+  scheduling strategy: TIMER_DRIVEN
+  scheduling period: 0 sec
+  penalization period: 30 sec
+  yield period: 1 sec
+  run duration nanos: 50000
+  auto-terminated relationships list:
+  - unmatched
+  Properties:
+    Routing Strategy: Route to Property name
+- name: UpdateAttribute
+  id: 92557c76-f251-45a4-0000-000000000000
+  class: org.apache.nifi.processors.attributes.UpdateAttribute
+  max concurrent tasks: 2
+  scheduling strategy: TIMER_DRIVEN
+  scheduling period: 0 sec
+  penalization period: 30 sec
+  yield period: 1 sec
+  run duration nanos: 100000
+  auto-terminated relationships list: []
+  Properties:
+    Delete Attributes Expression:
+    property 1: value 1
+    property 2: value 2 ${nextInt()}
+Process Groups: []
+Input Ports: []
+Output Ports: []
+Funnels:
+- id: 4ad21392-16a4-1794-0000-000000000000
+Connections:
+- name: 4ad21392-16a4-1794-0000-000000000000//UpdateAttribute
+  id: 4ad21397-16a4-1794-0000-000000000000
+  source id: 4ad21392-16a4-1794-0000-000000000000
+  source relationship names: []
+  destination id: 92557c76-f251-45a4-0000-000000000000
+  max work queue size: 10000
+  max work queue data size: 1 GB
+  flowfile expiration: 0 sec
+  queue prioritizer class: ''
+- name: GenerateFlowFile/success/4ad21392-16a4-1794-0000-000000000000
+  id: 4ad21393-16a4-1794-0000-000000000000
+  source id: 16a47794-5391-4ad2-0000-000000000000
+  source relationship names:
+  - success
+  destination id: 4ad21392-16a4-1794-0000-000000000000
+  max work queue size: 10000
+  max work queue data size: 1 GB
+  flowfile expiration: 0 sec
+  queue prioritizer class: ''
+- name: GenerateFlowFile/success/4ad21392-16a4-1794-0000-000000000000
+  id: 4ad21394-16a4-1794-0000-000000000000
+  source id: 53914ad2-7794-16a4-0000-000000000000
+  source relationship names:
+  - success
+  destination id: 4ad21392-16a4-1794-0000-000000000000
+  max work queue size: 10000
+  max work queue data size: 1 GB
+  flowfile expiration: 0 sec
+  queue prioritizer class: ''
+- name: GenerateFlowFile/success/4ad21392-16a4-1794-0000-000000000000
+  id: 4ad21395-16a4-1794-0000-000000000000
+  source id: 779416a4-4ad2-1391-0000-000000000000
+  source relationship names:
+  - success
+  destination id: 4ad21392-16a4-1794-0000-000000000000
+  max work queue size: 10000
+  max work queue data size: 1 GB
+  flowfile expiration: 0 sec
+  queue prioritizer class: ''
+- name: GenerateFlowFile/success/4ad21392-16a4-1794-0000-000000000000
+  id: 4ad21396-16a4-1794-0000-000000000000
+  source id: 4ad21391-16a4-1794-0000-000000000000
+  source relationship names:
+  - success
+  destination id: 4ad21392-16a4-1794-0000-000000000000
+  max work queue size: 10000
+  max work queue data size: 1 GB
+  flowfile expiration: 0 sec
+  queue prioritizer class: ''
+- name: UpdateAttribute/success/RouteOnAttribute
+  id: 4c53556e-eb46-458c-0000-000000000000
+  source id: 92557c76-f251-45a4-0000-000000000000
+  source relationship names:
+  - success
+  destination id: 397a4910-cc01-4c6b-0000-000000000000
+  max work queue size: 2000
+  max work queue data size: 0 MB
+  flowfile expiration: 0 sec
+  queue prioritizer class: org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer
+Remote Processing Groups: []
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
index abd6a6c..333adf8 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
@@ -53,6 +53,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
     public static final String HAS_INVALID_DESTINATION_ID = " has invalid destination id ";
     public static final String FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_IDS = "Found the following duplicate processor ids: ";
     public static final String FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS = "Found the following duplicate connection ids: ";
+    public static final String FOUND_THE_FOLLOWING_DUPLICATE_FUNNEL_IDS = "Found the following duplicate funnel ids: ";
     public static final String FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES = "Found the following duplicate remote processing group names: ";
     public static String TOP_LEVEL_NAME = "top level";
     private FlowControllerSchema flowControllerProperties;
@@ -100,6 +101,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
         List<RemoteProcessingGroupSchema> allRemoteProcessingGroups = allProcessGroups.stream().flatMap(p -> p.getRemoteProcessingGroups().stream()).collect(Collectors.toList());
 
         List<String> allProcessorIds = allProcessGroups.stream().flatMap(p -> p.getProcessors().stream()).map(ProcessorSchema::getId).collect(Collectors.toList());
+        List<String> allFunnelIds = allProcessGroups.stream().flatMap(p -> p.getFunnels().stream()).map(FunnelSchema::getId).collect(Collectors.toList());
         List<String> allConnectionIds = allConnectionSchemas.stream().map(ConnectionSchema::getId).collect(Collectors.toList());
         List<String> allRemoteProcessingGroupNames = allRemoteProcessingGroups.stream().map(RemoteProcessingGroupSchema::getName).collect(Collectors.toList());
         List<String> allRemoteInputPortIds = allRemoteProcessingGroups.stream().filter(r -> r.getInputPorts() != null)
@@ -108,6 +110,7 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
         List<String> allOutputPortIds = allProcessGroups.stream().flatMap(p -> p.getOutputPortSchemas().stream()).map(PortSchema::getId).collect(Collectors.toList());
 
         checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_IDS, allProcessorIds);
+        checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_FUNNEL_IDS, allFunnelIds);
         checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS, allConnectionIds);
         checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES, allRemoteProcessingGroupNames);
         checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_INPUT_PORT_IDS, allRemoteInputPortIds);
@@ -115,7 +118,8 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
         checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_OUTPUT_PORT_IDS, allOutputPortIds);
 
         // Potential connection sources and destinations need to have unique ids
-        OverlapResults<String> overlapResults = findOverlap(new HashSet<>(allProcessorIds), new HashSet<>(allRemoteInputPortIds), new HashSet<>(allInputPortIds), new HashSet<>(allOutputPortIds));
+        OverlapResults<String> overlapResults = findOverlap(new HashSet<>(allProcessorIds), new HashSet<>(allRemoteInputPortIds), new HashSet<>(allInputPortIds), new HashSet<>(allOutputPortIds),
+                new HashSet<>(allFunnelIds));
         if (overlapResults.duplicates.size() > 0) {
             addValidationIssue(FOUND_THE_FOLLOWING_DUPLICATE_IDS + overlapResults.duplicates.stream().sorted().collect(Collectors.joining(", ")));
         }

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FunnelSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FunnelSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FunnelSchema.java
new file mode 100644
index 0000000..ee0b31b
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FunnelSchema.java
@@ -0,0 +1,30 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.nifi.minifi.commons.schema;
+
+import org.apache.nifi.minifi.commons.schema.common.BaseSchemaWithId;
+
+import java.util.Map;
+
+public class FunnelSchema extends BaseSchemaWithId {
+    public FunnelSchema(Map map) {
+        super(map, "Funnel(id: {id})");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchema.java
index 88688fc..7be69c0 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchema.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchema.java
@@ -28,8 +28,10 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CONNECTIONS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.FUNNELS_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.ID_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.INPUT_PORTS_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NAME_KEY;
@@ -44,6 +46,7 @@ public class ProcessGroupSchema extends BaseSchemaWithIdAndName implements Writa
 
     private String comment;
     private List<ProcessorSchema> processors;
+    private List<FunnelSchema> funnels;
     private List<ConnectionSchema> connections;
     private List<RemoteProcessingGroupSchema> remoteProcessingGroups;
     private List<ProcessGroupSchema> processGroupSchemas;
@@ -54,6 +57,7 @@ public class ProcessGroupSchema extends BaseSchemaWithIdAndName implements Writa
         super(map, wrapperName);
 
         processors = getOptionalKeyAsList(map, PROCESSORS_KEY, ProcessorSchema::new, wrapperName);
+        funnels = getOptionalKeyAsList(map, FUNNELS_KEY, FunnelSchema::new, wrapperName);
         remoteProcessingGroups = getOptionalKeyAsList(map, REMOTE_PROCESSING_GROUPS_KEY, RemoteProcessingGroupSchema::new, wrapperName);
         connections = getOptionalKeyAsList(map, CONNECTIONS_KEY, ConnectionSchema::new, wrapperName);
         inputPortSchemas = getOptionalKeyAsList(map, INPUT_PORTS_KEY, m -> new PortSchema(m, "InputPort(id: {id}, name: {name})"), wrapperName);
@@ -74,9 +78,14 @@ public class ProcessGroupSchema extends BaseSchemaWithIdAndName implements Writa
         Set<String> portIds = getPortIds();
         connections.stream().filter(c -> portIds.contains(c.getSourceId())).forEachOrdered(c -> c.setNeedsSourceRelationships(false));
 
+
+        Set<String> funnelIds = new HashSet<>(funnels.stream().map(FunnelSchema::getId).collect(Collectors.toList()));
+        connections.stream().filter(c -> funnelIds.contains(c.getSourceId())).forEachOrdered(c -> c.setNeedsSourceRelationships(false));
+
         addIssuesIfNotNull(processors);
         addIssuesIfNotNull(remoteProcessingGroups);
         addIssuesIfNotNull(processGroupSchemas);
+        addIssuesIfNotNull(funnels);
         addIssuesIfNotNull(connections);
     }
 
@@ -91,6 +100,7 @@ public class ProcessGroupSchema extends BaseSchemaWithIdAndName implements Writa
         putListIfNotNull(result, PROCESS_GROUPS_KEY, processGroupSchemas);
         putListIfNotNull(result, INPUT_PORTS_KEY, inputPortSchemas);
         putListIfNotNull(result, OUTPUT_PORTS_KEY, outputPortSchemas);
+        putListIfNotNull(result, FUNNELS_KEY, funnels);
         putListIfNotNull(result, CONNECTIONS_KEY, connections);
         putListIfNotNull(result, REMOTE_PROCESSING_GROUPS_KEY, remoteProcessingGroups);
         return result;
@@ -100,6 +110,10 @@ public class ProcessGroupSchema extends BaseSchemaWithIdAndName implements Writa
         return processors;
     }
 
+    public List<FunnelSchema> getFunnels() {
+        return funnels;
+    }
+
     public List<ConnectionSchema> getConnections() {
         return connections;
     }

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithId.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithId.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithId.java
new file mode 100644
index 0000000..96b47a7
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithId.java
@@ -0,0 +1,73 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.nifi.minifi.commons.schema.common;
+
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.ID_KEY;
+
+public class BaseSchemaWithId extends BaseSchema implements WritableSchema {
+    public static final Pattern VALID_ID_PATTERN = Pattern.compile("[A-Za-z0-9_-]+");
+    public static final String ID_DOES_NOT_MATCH_VALID_ID_PATTERN = "Id does not match valid pattern (" + VALID_ID_PATTERN + "): ";
+
+    private final String wrapperName;
+    private String id;
+
+    public BaseSchemaWithId(Map map, String wrapperName) {
+        id = getId(map, wrapperName);
+        this.wrapperName = wrapperName;
+    }
+
+    protected String getId(Map map, String wrapperName) {
+        return getOptionalKeyAsType(map, ID_KEY, String.class, wrapperName, "");
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public String getWrapperName() {
+        return wrapperName.replace("{id}", StringUtil.isNullOrEmpty(id) ? "unkown" : id);
+    }
+
+    @Override
+    public Map<String, Object> toMap() {
+        Map<String, Object> map = mapSupplier.get();
+        map.put(ID_KEY, id);
+        return map;
+    }
+
+    @Override
+    public List<String> getValidationIssues() {
+        List<String> validationIssues = super.getValidationIssues();
+        if (StringUtil.isNullOrEmpty(id)) {
+            validationIssues.add(getIssueText(CommonPropertyKeys.ID_KEY, getWrapperName(), IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED));
+        } else if (!VALID_ID_PATTERN.matcher(id).matches()) {
+            validationIssues.add(getIssueText(CommonPropertyKeys.ID_KEY, getWrapperName(), ID_DOES_NOT_MATCH_VALID_ID_PATTERN));
+        }
+        return validationIssues;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java
index a1f7bb5..71a4f41 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java
@@ -19,39 +19,18 @@
 
 package org.apache.nifi.minifi.commons.schema.common;
 
-import java.util.List;
 import java.util.Map;
-import java.util.regex.Pattern;
 
-import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.ID_KEY;
 import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NAME_KEY;
 
-public abstract class BaseSchemaWithIdAndName extends BaseSchema implements WritableSchema {
-    public static final Pattern VALID_ID_PATTERN = Pattern.compile("[A-Za-z0-9_-]+");
-    public static final String ID_DOES_NOT_MATCH_VALID_ID_PATTERN = "Id does not match valid pattern (" + VALID_ID_PATTERN + ")";
-
-    private final String wrapperName;
-    private String id;
+public abstract class BaseSchemaWithIdAndName extends BaseSchemaWithId implements WritableSchema {
     private String name;
 
     public BaseSchemaWithIdAndName(Map map, String wrapperName) {
-        this.wrapperName = wrapperName;
-        id = getId(map, getWrapperName());
+        super(map, wrapperName);
         name = getOptionalKeyAsType(map, NAME_KEY, String.class, getWrapperName(), "");
     }
 
-    protected String getId(Map map, String wrapperName) {
-        return getOptionalKeyAsType(map, ID_KEY, String.class, wrapperName, "");
-    }
-
-    public String getId() {
-        return id;
-    }
-
-    public void setId(String id) {
-        this.id = id;
-    }
-
     public String getName() {
         return name;
     }
@@ -61,25 +40,13 @@ public abstract class BaseSchemaWithIdAndName extends BaseSchema implements Writ
     }
 
     public String getWrapperName() {
-        return wrapperName.replace("{id}", StringUtil.isNullOrEmpty(id) ? "unkown" : id).replace("{name}", StringUtil.isNullOrEmpty(name) ? "unkown" : name);
+        return super.getWrapperName().replace("{name}", StringUtil.isNullOrEmpty(name) ? "unkown" : name);
     }
 
     @Override
     public Map<String, Object> toMap() {
-        Map<String, Object> map = mapSupplier.get();
-        map.put(ID_KEY, id);
+        Map<String, Object> map = super.toMap();
         map.put(NAME_KEY, name);
         return map;
     }
-
-    @Override
-    public List<String> getValidationIssues() {
-        List<String> validationIssues = super.getValidationIssues();
-        if (StringUtil.isNullOrEmpty(id)) {
-            validationIssues.add(getIssueText(CommonPropertyKeys.ID_KEY, getWrapperName(), IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED));
-        } else if (!VALID_ID_PATTERN.matcher(id).matches()) {
-            validationIssues.add(getIssueText(CommonPropertyKeys.ID_KEY, getWrapperName(), ID_DOES_NOT_MATCH_VALID_ID_PATTERN));
-        }
-        return validationIssues;
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java
index a603f3e..dcf0500 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java
@@ -32,6 +32,7 @@ public class CommonPropertyKeys {
     public static final String REMOTE_PROCESSING_GROUPS_KEY = "Remote Processing Groups";
     public static final String INPUT_PORTS_KEY = "Input Ports";
     public static final String OUTPUT_PORTS_KEY = "Output Ports";
+    public static final String FUNNELS_KEY = "Funnels";
     public static final String PROVENANCE_REPO_KEY = "Provenance Repository";
 
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-docs/src/main/markdown/System_Admin_Guide.md
----------------------------------------------------------------------
diff --git a/minifi-docs/src/main/markdown/System_Admin_Guide.md b/minifi-docs/src/main/markdown/System_Admin_Guide.md
index 0836443..f672074 100644
--- a/minifi-docs/src/main/markdown/System_Admin_Guide.md
+++ b/minifi-docs/src/main/markdown/System_Admin_Guide.md
@@ -404,6 +404,7 @@ Remote Processing Groups            | The remote processing groups contained in
 Connections                         | The connections contained in this Process Group. (Defined below)
 Input Ports                         | The input ports contained in this Process Group. (Defined below)
 Output Ports                        | The output ports contained in this Process Group. (Defined below)
+Funnels                             | The funnels contained in this Process Group. (Defined below)
 Process Groups                      | The child Process Groups contained in this Process Group.
 
 ## Input Ports
@@ -424,6 +425,14 @@ These ports provide output from the Process Group they reside on. (Currently onl
 name                     | The name of what this output port will do.
 id                       | The id of this output port.  This needs to be set to a unique filesystem-friendly value (regex: [A-Za-z0-9_-]+)
 
+## Funnels
+
+Funnels can be used to combine outputs from multiple processors into a single connection for ease of design.
+
+*Property*               | *Description*
+--------------------     | -------------
+id                       | The id of this funnel.  This needs to be set to a unique filesystem-friendly value (regex: [A-Za-z0-9_-]+)
+
 ## Connections
 
 There can be multiple connections in this version of MiNiFi. The "Connections" subsection is a list of connections. Each connection must specify these properties.

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/config.yml
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/config.yml b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/config.yml
index c88e47e..a6d13f6 100644
--- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/config.yml
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/config.yml
@@ -59,5 +59,6 @@ Processors: []
 Process Groups: []
 Input Ports: []
 Output Ports: []
+Funnels: []
 Connections: []
 Remote Processing Groups: []

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMain.java
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMain.java b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMain.java
index e62392d..b726c5a 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMain.java
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMain.java
@@ -63,7 +63,6 @@ public class ConfigMain {
     public static final int ERR_UNABLE_TO_OPEN_OUTPUT = 2;
     public static final int ERR_UNABLE_TO_OPEN_INPUT = 3;
     public static final int ERR_UNABLE_TO_READ_TEMPLATE = 4;
-    public static final int ERR_UNABLE_TO_TRANSFORM_TEMPLATE = 5;
     public static final int ERR_UNABLE_TO_PARSE_CONFIG = 6;
     public static final int ERR_INVALID_CONFIG = 7;
     public static final int ERR_UNABLE_TO_CLOSE_CONFIG = 8;
@@ -171,7 +170,8 @@ public class ConfigMain {
                 StringBuilder name = new StringBuilder();
                 ConnectableDTO connectionSource = connection.getSource();
                 if (connectionSource != null) {
-                    name.append(connectionSource.getName());
+                    String connectionSourceName = connectionSource.getName();
+                    name.append(StringUtil.isNullOrEmpty(connectionSourceName) ? connectionSource.getId() : connectionSourceName);
                 }
                 name.append("/");
                 if (connection.getSelectedRelationships() != null && connection.getSelectedRelationships().size() > 0) {
@@ -180,7 +180,8 @@ public class ConfigMain {
                 name.append("/");
                 ConnectableDTO connectionDestination = connection.getDestination();
                 if (connectionDestination != null) {
-                    name.append(connectionDestination.getName());
+                    String connectionDestinationName = connectionDestination.getName();
+                    name.append(StringUtil.isNullOrEmpty(connectionDestinationName) ? connectionDestination.getId() : connectionDestinationName);
                 }
                 connection.setName(name.toString());
             }
@@ -203,14 +204,9 @@ public class ConfigMain {
         nullToEmpty(flowSnippetDTO.getProcessGroups()).stream().map(ProcessGroupDTO::getContents).forEach(f -> getAllFlowSnippets(f, result));
     }
 
-    public static ConfigSchema transformTemplateToSchema(InputStream source) throws JAXBException, IOException, SchemaLoaderException {
+    public static ConfigSchema transformTemplateToSchema(InputStream source) throws JAXBException, IOException {
         try {
             TemplateDTO templateDTO = (TemplateDTO) JAXBContext.newInstance(TemplateDTO.class).createUnmarshaller().unmarshal(source);
-
-            if (templateDTO.getSnippet().getFunnels().size() != 0){
-                throw new SchemaLoaderException("Funnels are not currently supported in MiNiFi. Please remove any from the template and try again.");
-            }
-
             enrichFlowSnippetDTO(templateDTO.getSnippet());
             ConfigSchema configSchema = new ConfigSchemaFunction().apply(templateDTO);
             return configSchema;
@@ -329,10 +325,6 @@ public class ConfigMain {
                 System.out.println();
                 printTransformUsage();
                 return ERR_UNABLE_TO_READ_TEMPLATE;
-            } catch (SchemaLoaderException e) {
-                System.out.println("Error transforming template to YAML. (" + e.getMessage() + ")");
-                System.out.println();
-                return ERR_UNABLE_TO_TRANSFORM_TEMPLATE;
             }
         } catch (FileNotFoundException e) {
             return handleErrorOpeningInput(args[1], ConfigMain::printTransformUsage, e);

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConfigSchemaFunction.java
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConfigSchemaFunction.java b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConfigSchemaFunction.java
index 9cdccf5..d6aaabf 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConfigSchemaFunction.java
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConfigSchemaFunction.java
@@ -19,10 +19,12 @@ package org.apache.nifi.minifi.toolkit.configuration.dto;
 
 import org.apache.nifi.minifi.commons.schema.ConfigSchema;
 import org.apache.nifi.minifi.commons.schema.ConnectionSchema;
+import org.apache.nifi.minifi.commons.schema.FunnelSchema;
 import org.apache.nifi.minifi.commons.schema.PortSchema;
 import org.apache.nifi.minifi.commons.schema.ProcessGroupSchema;
 import org.apache.nifi.minifi.commons.schema.ProcessorSchema;
 import org.apache.nifi.minifi.commons.schema.RemoteProcessingGroupSchema;
+import org.apache.nifi.minifi.commons.schema.common.CollectionUtil;
 import org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys;
 import org.apache.nifi.minifi.commons.schema.common.StringUtil;
 import org.apache.nifi.web.api.dto.FlowSnippetDTO;
@@ -44,20 +46,23 @@ public class ConfigSchemaFunction implements Function<TemplateDTO, ConfigSchema>
     private final FlowControllerSchemaFunction flowControllerSchemaFunction;
     private final ProcessorSchemaFunction processorSchemaFunction;
     private final ConnectionSchemaFunction connectionSchemaFunction;
+    private final FunnelSchemaFunction funnelSchemaFunction;
     private final RemoteProcessingGroupSchemaFunction remoteProcessingGroupSchemaFunction;
     private final PortSchemaFunction inputPortSchemaFunction;
     private final PortSchemaFunction outputPortSchemaFunction;
 
     public ConfigSchemaFunction() {
-        this(new FlowControllerSchemaFunction(), new ProcessorSchemaFunction(), new ConnectionSchemaFunction(), new RemoteProcessingGroupSchemaFunction(new RemoteInputPortSchemaFunction()),
-                new PortSchemaFunction(INPUT_PORTS_KEY), new PortSchemaFunction(OUTPUT_PORTS_KEY));
+        this(new FlowControllerSchemaFunction(), new ProcessorSchemaFunction(), new ConnectionSchemaFunction(), new FunnelSchemaFunction(), new RemoteProcessingGroupSchemaFunction(
+                new RemoteInputPortSchemaFunction()), new PortSchemaFunction(INPUT_PORTS_KEY), new PortSchemaFunction(OUTPUT_PORTS_KEY));
     }
 
     public ConfigSchemaFunction(FlowControllerSchemaFunction flowControllerSchemaFunction, ProcessorSchemaFunction processorSchemaFunction, ConnectionSchemaFunction connectionSchemaFunction,
-                                RemoteProcessingGroupSchemaFunction remoteProcessingGroupSchemaFunction, PortSchemaFunction inputPortSchemaFunction, PortSchemaFunction outputPortSchemaFunction) {
+                                FunnelSchemaFunction funnelSchemaFunction, RemoteProcessingGroupSchemaFunction remoteProcessingGroupSchemaFunction, PortSchemaFunction inputPortSchemaFunction,
+                                PortSchemaFunction outputPortSchemaFunction) {
         this.flowControllerSchemaFunction = flowControllerSchemaFunction;
         this.processorSchemaFunction = processorSchemaFunction;
         this.connectionSchemaFunction = connectionSchemaFunction;
+        this.funnelSchemaFunction = funnelSchemaFunction;
         this.remoteProcessingGroupSchemaFunction = remoteProcessingGroupSchemaFunction;
         this.inputPortSchemaFunction = inputPortSchemaFunction;
         this.outputPortSchemaFunction = outputPortSchemaFunction;
@@ -95,14 +100,18 @@ public class ConfigSchemaFunction implements Function<TemplateDTO, ConfigSchema>
                 .map(ProcessorSchema::toMap)
                 .collect(Collectors.toList()));
 
-
-
         map.put(CommonPropertyKeys.CONNECTIONS_KEY, nullToEmpty(snippet.getConnections()).stream()
                 .map(connectionSchemaFunction)
                 .sorted(Comparator.comparing(ConnectionSchema::getName))
                 .map(ConnectionSchema::toMap)
                 .collect(Collectors.toList()));
 
+        map.put(CommonPropertyKeys.FUNNELS_KEY, CollectionUtil.nullToEmpty(snippet.getFunnels()).stream()
+                .map(funnelSchemaFunction)
+                .sorted(Comparator.comparing(FunnelSchema::getId))
+                .map(FunnelSchema::toMap)
+                .collect(Collectors.toList()));
+
         map.put(CommonPropertyKeys.REMOTE_PROCESSING_GROUPS_KEY, nullToEmpty(snippet.getRemoteProcessGroups()).stream()
                 .map(remoteProcessingGroupSchemaFunction)
                 .sorted(Comparator.comparing(RemoteProcessingGroupSchema::getName))

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/FunnelSchemaFunction.java
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/FunnelSchemaFunction.java b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/FunnelSchemaFunction.java
new file mode 100644
index 0000000..a03b105
--- /dev/null
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/FunnelSchemaFunction.java
@@ -0,0 +1,38 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.nifi.minifi.toolkit.configuration.dto;
+
+import org.apache.nifi.minifi.commons.schema.FunnelSchema;
+import org.apache.nifi.web.api.dto.FunnelDTO;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.ID_KEY;
+
+public class FunnelSchemaFunction implements Function<FunnelDTO, FunnelSchema> {
+    @Override
+    public FunnelSchema apply(FunnelDTO funnelDTO) {
+        Map<String, Object> map = new HashMap<>();
+        map.put(ID_KEY, funnelDTO.getId());
+        return new FunnelSchema(map);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java b/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java
index d61a641..9f7559e 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java
@@ -18,6 +18,7 @@
 package org.apache.nifi.minifi.toolkit.configuration;
 
 import org.apache.commons.io.Charsets;
+import org.apache.nifi.controller.repository.io.LimitedInputStream;
 import org.apache.nifi.minifi.commons.schema.ConfigSchema;
 import org.apache.nifi.minifi.commons.schema.ConnectionSchema;
 import org.apache.nifi.minifi.commons.schema.ProcessorSchema;
@@ -148,8 +149,8 @@ public class ConfigMainTest {
     @Test
     public void testTransformErrorTransformingTemplate() throws FileNotFoundException {
         when(pathInputStreamFactory.create(testInput)).thenAnswer(invocation ->
-                ConfigMainTest.class.getClassLoader().getResourceAsStream("TemplateWithFunnel.xml"));
-        assertEquals(ConfigMain.ERR_UNABLE_TO_TRANSFORM_TEMPLATE, configMain.execute(new String[]{ConfigMain.TRANSFORM, testInput, testOutput}));
+                new LimitedInputStream(ConfigMainTest.class.getClassLoader().getResourceAsStream("TemplateWithFunnel.xml"), 25));
+        assertEquals(ConfigMain.ERR_UNABLE_TO_READ_TEMPLATE, configMain.execute(new String[]{ConfigMain.TRANSFORM, testInput, testOutput}));
     }
 
     @Test
@@ -186,6 +187,11 @@ public class ConfigMainTest {
     }
 
     @Test
+    public void testTransformRoundTripStressTestFrameworkFunnel() throws IOException, JAXBException, SchemaLoaderException {
+        transformRoundTrip("StressTestFrameworkFunnel");
+    }
+
+    @Test
     public void testTransformRoundTripMultipleRelationships() throws IOException, JAXBException, SchemaLoaderException {
         transformRoundTrip("MultipleRelationships");
     }
@@ -210,8 +216,8 @@ public class ConfigMainTest {
         ConfigMain.transformTemplateToSchema(getClass().getClassLoader().getResourceAsStream("TemplateWithInputPort.xml")).toMap();
     }
 
-    @Test(expected = SchemaLoaderException.class)
-    public void testFailToTransformFunnel() throws IOException, JAXBException, SchemaLoaderException {
+    @Test
+    public void testSuccessTransformFunnel() throws IOException, JAXBException, SchemaLoaderException {
         ConfigMain.transformTemplateToSchema(getClass().getClassLoader().getResourceAsStream("TemplateWithFunnel.xml")).toMap();
     }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/CsvToJson.yml
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/CsvToJson.yml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/CsvToJson.yml
index d03bd16..3707335 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/CsvToJson.yml
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/CsvToJson.yml
@@ -151,6 +151,7 @@ Processors:
 Process Groups: []
 Input Ports: []
 Output Ports: []
+Funnels: []
 Connections:
 - name: ExtractText/matched/ReplaceText2
   id: 56ef3e2e-ee35-4598-9fbe-ae86050960b0

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/DecompressionCircularFlow.yml
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/DecompressionCircularFlow.yml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/DecompressionCircularFlow.yml
index d2f90b2..c27fb62 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/DecompressionCircularFlow.yml
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/DecompressionCircularFlow.yml
@@ -190,6 +190,7 @@ Processors:
 Process Groups: []
 Input Ports: []
 Output Ports: []
+Funnels: []
 Connections:
 - name: Compressed?/gzip/Uncompress GZIP
   id: 5de215d5-9f7e-414b-98aa-2edaa0514d99

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/InvokeHttpMiNiFiTemplateTest.yml
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/InvokeHttpMiNiFiTemplateTest.yml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/InvokeHttpMiNiFiTemplateTest.yml
index aa7d6d5..b518f28 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/InvokeHttpMiNiFiTemplateTest.yml
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/InvokeHttpMiNiFiTemplateTest.yml
@@ -169,6 +169,7 @@ Processors:
 Process Groups: []
 Input Ports: []
 Output Ports: []
+Funnels: []
 Connections:
 - name: Route On Status Code/200/LogAttribute
   id: 3039718a-bb40-4811-9b74-ecbe926daae8

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/MultipleRelationships.yml
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/MultipleRelationships.yml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/MultipleRelationships.yml
index 12ed7e1..4e3b479 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/MultipleRelationships.yml
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/MultipleRelationships.yml
@@ -117,6 +117,7 @@ Processors:
 Process Groups: []
 Input Ports: []
 Output Ports: []
+Funnels: []
 Connections:
 - name: GenerateFlowFile/success/UpdateAttribute
   id: 7c79cce3-0157-1000-0000-000000000000

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ProcessGroupsAndRemoteProcessGroups.yml
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ProcessGroupsAndRemoteProcessGroups.yml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ProcessGroupsAndRemoteProcessGroups.yml
index 101a360..7783c53 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ProcessGroupsAndRemoteProcessGroups.yml
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ProcessGroupsAndRemoteProcessGroups.yml
@@ -142,6 +142,7 @@ Process Groups:
     Output Ports:
     - id: 207a6d92-0158-1000-0000-000000000000
       name: output
+    Funnels: []
     Connections:
     - id: 21a6abb9-0158-1000-0000-000000000000
       name: UpdateAttribute/success/21a39aba-0158-1000-a1a0-1b55bcddcd72
@@ -190,6 +191,7 @@ Process Groups:
   Output Ports:
   - id: 2079b327-0158-1000-0000-000000000000
     name: output
+  Funnels: []
   Connections:
   - id: 21a5b1f1-0158-1000-0000-000000000000
     name: UpdateAttribute/success/21a2fb5e-0158-1000-3b5e-5a7d3aaee01b
@@ -243,6 +245,7 @@ Process Groups:
       use compression: false
 Input Ports: []
 Output Ports: []
+Funnels: []
 Connections:
 - id: 2077bf8f-0158-1000-0000-000000000000
   name: GenerateFlowFile/success/UpdateAttribute

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ReplaceTextExpressionLanguageCSVReformatting.yml
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ReplaceTextExpressionLanguageCSVReformatting.yml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ReplaceTextExpressionLanguageCSVReformatting.yml
index 36bdcd8..64a9108 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ReplaceTextExpressionLanguageCSVReformatting.yml
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/ReplaceTextExpressionLanguageCSVReformatting.yml
@@ -126,6 +126,7 @@ Processors:
 Process Groups: []
 Input Ports: []
 Output Ports: []
+Funnels: []
 Connections:
 - name: Generate Empty File/success/Set CSV Content
   id: ca71a875-0ff5-41ef-bbe0-da5de0ca1e08

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/12a58a86/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/StressTestFramework.yml
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/StressTestFramework.yml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/StressTestFramework.yml
index d135be8..2df4526 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/StressTestFramework.yml
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/StressTestFramework.yml
@@ -101,6 +101,7 @@ Processors:
 Process Groups: []
 Input Ports: []
 Output Ports: []
+Funnels: []
 Connections:
 - name: GenerateFlowFile/success/UpdateAttribute
   id: 0e6873cc-cb9d-4e98-92aa-a3319f4c1b02