You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/12/13 14:54:54 UTC
[2/2] nifi-minifi git commit: MINIFI-153 - Adding proxy settings to
remote process group
MINIFI-153 - Adding proxy settings to remote process group
This closes #64
Signed-off-by: Joseph Percivall <JP...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi/commit/94869042
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi/tree/94869042
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi/diff/94869042
Branch: refs/heads/master
Commit: 94869042fdfeec21d0fbcc4b4c18886464972b05
Parents: cbb2bdd
Author: Bryan Rosander <br...@apache.org>
Authored: Tue Dec 6 09:59:31 2016 -0500
Committer: Joseph Percivall <JP...@apache.org>
Committed: Tue Dec 13 09:54:34 2016 -0500
----------------------------------------------------------------------
.../bootstrap/util/ConfigTransformer.java | 9 +
.../bootstrap/util/ConfigTransformerTest.java | 22 +-
...okeHttpMiNiFiProxyNoPasswordTemplateTest.yml | 302 +++++++++++++++++++
...nvokeHttpMiNiFiProxyPasswordTemplateTest.yml | 302 +++++++++++++++++++
.../minifi/commons/schema/ConfigSchema.java | 31 +-
.../schema/RemoteProcessGroupSchema.java | 99 +++++-
.../commons/schema/common/BaseSchema.java | 21 +-
.../schema/common/BaseSchemaWithIdAndName.java | 2 +-
.../schema/common/CollectionOverlap.java | 56 ++++
.../commons/schema/v1/ConfigSchemaV1.java | 9 +-
.../commons/schema/v2/ConfigSchemaV2.java | 94 ++----
.../commons/schema/v2/ProcessGroupSchemaV2.java | 26 +-
.../schema/v2/RemoteProcessGroupSchemaV2.java | 101 +++++++
.../schema/RemoteProcessGroupSchemaTest.java | 56 +++-
.../v2/RemoteProcessGroupSchemaV2Test.java | 82 +++++
.../src/main/markdown/System_Admin_Guide.md | 7 +-
.../dto/RemoteProcessGroupSchemaFunction.java | 4 +
.../toolkit/configuration/ConfigMainTest.java | 32 +-
.../InvokeHttpMiNiFiTemplateTest-v2.yml | 297 ++++++++++++++++++
.../resources/InvokeHttpMiNiFiTemplateTest.xml | 7 +-
.../resources/InvokeHttpMiNiFiTemplateTest.yml | 6 +-
.../ProcessGroupsAndRemoteProcessGroups-v2.yml | 287 ++++++++++++++++++
.../ProcessGroupsAndRemoteProcessGroups.yml | 8 +
.../test/resources/SimpleTailFileToRPG-v2.yml | 113 +++++++
.../src/test/resources/SimpleTailFileToRPG.yml | 4 +
25 files changed, 1816 insertions(+), 161 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/94869042/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
index 55d3285..b724019 100644
--- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
+++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
@@ -19,6 +19,7 @@ package org.apache.nifi.minifi.bootstrap.util;
import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
import org.apache.nifi.minifi.bootstrap.exception.InvalidConfigurationException;
import org.apache.nifi.minifi.commons.schema.ComponentStatusRepositorySchema;
@@ -569,6 +570,14 @@ public final class ConfigTransformer {
addTextElement(element, "yieldPeriod", remoteProcessGroupProperties.getYieldPeriod());
addTextElement(element, "transmitting", "true");
addTextElement(element, "transportProtocol", remoteProcessGroupProperties.getTransportProtocol());
+ addTextElement(element, "proxyHost", remoteProcessGroupProperties.getProxyHost());
+ if (remoteProcessGroupProperties.getProxyPort() != null) {
+ addTextElement(element, "proxyPort", Integer.toString(remoteProcessGroupProperties.getProxyPort()));
+ }
+ addTextElement(element, "proxyUser", remoteProcessGroupProperties.getProxyUser());
+ if (!StringUtils.isEmpty(remoteProcessGroupProperties.getProxyPassword())) {
+ addTextElement(element, "proxyPassword", remoteProcessGroupProperties.getProxyPassword());
+ }
List<RemoteInputPortSchema> remoteInputPorts = remoteProcessGroupProperties.getInputPorts();
for (RemoteInputPortSchema remoteInputPortSchema : remoteInputPorts) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/94869042/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
index be88cb4..2cb3a9c 100644
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
@@ -119,6 +119,21 @@ public class ConfigTransformerTest {
testConfigFileTransform("config-funnel-and-rpg.yml");
}
+ @Test
+ public void testRpgTransform() throws Exception {
+ testConfigFileTransform("config-multiple-RPGs.yml");
+ }
+
+ @Test
+ public void testRpgProxyNoPassTransform() throws Exception {
+ testConfigFileTransform("InvokeHttpMiNiFiProxyNoPasswordTemplateTest.yml");
+ }
+
+ @Test
+ public void testRpgProxyPassTransform() throws Exception {
+ testConfigFileTransform("InvokeHttpMiNiFiProxyPasswordTemplateTest.yml");
+ }
+
public void testConfigFileTransform(String configFile) throws Exception {
ConfigSchema configSchema = SchemaLoader.loadConfigSchemaFromYaml(ConfigTransformerTest.class.getClassLoader().getResourceAsStream(configFile));
@@ -215,7 +230,12 @@ public class ConfigTransformerTest {
assertEquals(remoteProcessingGroupSchema.getUrl(), getText(element, "url"));
assertEquals(remoteProcessingGroupSchema.getTimeout(), getText(element, "timeout"));
assertEquals(remoteProcessingGroupSchema.getYieldPeriod(), getText(element, "yieldPeriod"));
-
+ assertEquals(remoteProcessingGroupSchema.getTransportProtocol(), getText(element, "transportProtocol"));
+ assertEquals(remoteProcessingGroupSchema.getProxyHost(), getText(element, "proxyHost"));
+ String proxyPortText = getText(element, "proxyPort");
+ assertEquals(remoteProcessingGroupSchema.getProxyPort(), StringUtil.isNullOrEmpty(proxyPortText) ? null : Integer.parseInt(proxyPortText));
+ assertEquals(remoteProcessingGroupSchema.getProxyUser(), getText(element, "proxyUser"));
+ assertEquals(remoteProcessingGroupSchema.getProxyPassword(), getText(element, "proxyPassword"));
NodeList inputPortElements = (NodeList) xPathFactory.newXPath().evaluate("inputPort", element, XPathConstants.NODESET);
assertEquals(remoteProcessingGroupSchema.getInputPorts().size(), inputPortElements.getLength());
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/94869042/minifi-bootstrap/src/test/resources/InvokeHttpMiNiFiProxyNoPasswordTemplateTest.yml
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/resources/InvokeHttpMiNiFiProxyNoPasswordTemplateTest.yml b/minifi-bootstrap/src/test/resources/InvokeHttpMiNiFiProxyNoPasswordTemplateTest.yml
new file mode 100644
index 0000000..084da2d
--- /dev/null
+++ b/minifi-bootstrap/src/test/resources/InvokeHttpMiNiFiProxyNoPasswordTemplateTest.yml
@@ -0,0 +1,302 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+MiNiFi Config Version: 3
+Flow Controller:
+ name: InvokeHttpMiNiFiTemplateTest2
+ comment: ''
+Core Properties:
+ flow controller graceful shutdown period: 10 sec
+ flow service write delay interval: 500 ms
+ administrative yield duration: 30 sec
+ bored yield duration: 10 millis
+ max concurrent threads: 1
+FlowFile Repository:
+ partitions: 256
+ checkpoint interval: 2 mins
+ always sync: false
+ Swap:
+ threshold: 20000
+ in period: 5 sec
+ in threads: 1
+ out period: 5 sec
+ out threads: 4
+Content Repository:
+ content claim max appendable size: 10 MB
+ content claim max flow files: 100
+ always sync: false
+Provenance Repository:
+ provenance rollover time: 1 min
+Component Status Repository:
+ buffer size: 1440
+ snapshot frequency: 1 min
+Security Properties:
+ keystore: ''
+ keystore type: ''
+ keystore password: ''
+ key password: ''
+ truststore: ''
+ truststore type: ''
+ truststore password: ''
+ ssl protocol: ''
+ Sensitive Props:
+ key:
+ algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL
+ provider: BC
+Processors:
+- id: 0a73c5e4-7216-4cdf-9008-ace353478d55
+ name: LogAttribute
+ class: org.apache.nifi.processors.standard.LogAttribute
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 0 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list:
+ - success
+ Properties:
+ Attributes to Ignore:
+ Attributes to Log:
+ Log Level: info
+ Log Payload: 'false'
+ Log prefix:
+- id: 0bda7282-7a4f-4d39-83e5-ea86aa63f1dc
+ name: LogAttribute2
+ class: org.apache.nifi.processors.standard.LogAttribute
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 0 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list:
+ - success
+ Properties:
+ Attributes to Ignore:
+ Attributes to Log:
+ Log Level: info
+ Log Payload: 'false'
+ Log prefix:
+- id: dcff217a-a25c-424f-b9f1-d82c55bfa61b
+ name: Route On Status Code
+ class: org.apache.nifi.processors.standard.RouteOnAttribute
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 0 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list: []
+ Properties:
+ '200': ${invokehttp.status.code:equals(200)}
+ Routing Strategy: Route to Property name
+- id: 66649998-3b08-4b41-9f07-a51d999743f6
+ name: Search Google
+ class: org.apache.nifi.processors.standard.InvokeHTTP
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 0 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list: []
+ Properties:
+ Add Response Headers to Request: 'false'
+ Always Output Response: 'false'
+ Attributes to Send:
+ Basic Authentication Password:
+ Basic Authentication Username:
+ Connection Timeout: 5 secs
+ Content-Type: ${mime.type}
+ Digest Authentication: 'false'
+ Follow Redirects: 'True'
+ HTTP Method: GET
+ Include Date Header: 'True'
+ Max Length To Put In Attribute: '256'
+ Penalize on "No Retry": 'false'
+ Proxy Host:
+ Proxy Port:
+ Put Response Body In Attribute:
+ Read Timeout: 15 secs
+ Remote URL: http://www.google.com/search?q=${q}&rct=j
+ SSL Context Service:
+ Trusted Hostname:
+ Use Chunked Encoding: 'false'
+ invokehttp-proxy-password:
+ invokehttp-proxy-user:
+ send-message-body: 'true'
+- id: fcb378d7-6438-41d3-8485-7da85e870ca4
+ name: Source
+ class: org.apache.nifi.processors.standard.GenerateFlowFile
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 30 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list: []
+ Properties:
+ Batch Size: '1'
+ Data Format: Binary
+ File Size: 10 b
+ Unique FlowFiles: 'false'
+- id: 6638cd15-0aaf-4bba-a18a-722cf85038b6
+ name: q=nifi
+ class: org.apache.nifi.processors.attributes.UpdateAttribute
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 0 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list: []
+ Properties:
+ Delete Attributes Expression:
+ q: nifi
+Controller Services: []
+Process Groups: []
+Input Ports: []
+Output Ports: []
+Funnels: []
+Connections:
+- id: 3039718a-bb40-4811-9b74-ecbe926daae8
+ name: Route On Status Code/200/LogAttribute
+ source id: dcff217a-a25c-424f-b9f1-d82c55bfa61b
+ source relationship names:
+ - '200'
+ destination id: 0a73c5e4-7216-4cdf-9008-ace353478d55
+ max work queue size: 0
+ max work queue data size: 0 MB
+ flowfile expiration: 0 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer
+- id: 4a7bdaf1-eba0-4317-92f2-93cef6240c28
+ name: Route On Status Code/200/da15f83d-0158-1000-7f8d-eb4d2652f870
+ source id: dcff217a-a25c-424f-b9f1-d82c55bfa61b
+ source relationship names:
+ - '200'
+ destination id: da15f83d-0158-1000-7f8d-eb4d2652f870
+ max work queue size: 10000
+ max work queue data size: 1 GB
+ flowfile expiration: 0 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer
+- id: 9f948a25-afb4-4598-9a01-cfb7e3aa5920
+ name: Route On Status Code/unmatched/LogAttribute2
+ source id: dcff217a-a25c-424f-b9f1-d82c55bfa61b
+ source relationship names:
+ - unmatched
+ destination id: 0bda7282-7a4f-4d39-83e5-ea86aa63f1dc
+ max work queue size: 10000
+ max work queue data size: 1 GB
+ flowfile expiration: 0 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer
+- id: 7299c143-46f6-4a94-aa24-ec47db16f73e
+ name: Route On Status Code/unmatched/da15f83d-0158-1000-7f8d-eb4d2652f870
+ source id: dcff217a-a25c-424f-b9f1-d82c55bfa61b
+ source relationship names:
+ - unmatched
+ destination id: da15f83d-0158-1000-7f8d-eb4d2652f870
+ max work queue size: 10000
+ max work queue data size: 1 GB
+ flowfile expiration: 0 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer
+- id: 451b153a-8a4e-4bed-99ab-5159e499bcf1
+ name: Search Google/Failure/Route On Status Code
+ source id: 66649998-3b08-4b41-9f07-a51d999743f6
+ source relationship names:
+ - Failure
+ destination id: dcff217a-a25c-424f-b9f1-d82c55bfa61b
+ max work queue size: 10000
+ max work queue data size: 1 GB
+ flowfile expiration: 0 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer
+- id: ddbc1711-8cda-4418-b6ee-895763ef1a97
+ name: Search Google/No Retry/Route On Status Code
+ source id: 66649998-3b08-4b41-9f07-a51d999743f6
+ source relationship names:
+ - No Retry
+ destination id: dcff217a-a25c-424f-b9f1-d82c55bfa61b
+ max work queue size: 10000
+ max work queue data size: 1 GB
+ flowfile expiration: 0 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer
+- id: b97044aa-3401-49df-86ab-bc440cd8f6c8
+ name: Search Google/Original/Route On Status Code
+ source id: 66649998-3b08-4b41-9f07-a51d999743f6
+ source relationship names:
+ - Original
+ destination id: dcff217a-a25c-424f-b9f1-d82c55bfa61b
+ max work queue size: 10000
+ max work queue data size: 1 GB
+ flowfile expiration: 0 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer
+- id: c6a06d31-51fb-488f-bcde-495102f8c93f
+ name: Search Google/Response/Route On Status Code
+ source id: 66649998-3b08-4b41-9f07-a51d999743f6
+ source relationship names:
+ - Response
+ destination id: dcff217a-a25c-424f-b9f1-d82c55bfa61b
+ max work queue size: 10000
+ max work queue data size: 1 GB
+ flowfile expiration: 0 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer
+- id: 3172a4b6-c35b-4d2f-a7c1-68eb16cc1b37
+ name: Search Google/Retry/Route On Status Code
+ source id: 66649998-3b08-4b41-9f07-a51d999743f6
+ source relationship names:
+ - Retry
+ destination id: dcff217a-a25c-424f-b9f1-d82c55bfa61b
+ max work queue size: 10000
+ max work queue data size: 1 GB
+ flowfile expiration: 0 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer
+- id: 2ef71cd8-eb24-4653-8526-34ab84e79328
+ name: Source/success/q=nifi
+ source id: fcb378d7-6438-41d3-8485-7da85e870ca4
+ source relationship names:
+ - success
+ destination id: 6638cd15-0aaf-4bba-a18a-722cf85038b6
+ max work queue size: 0
+ max work queue data size: 0 MB
+ flowfile expiration: 0 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer
+- id: ee5ee5bf-a552-43cf-9514-c14d40350bff
+ name: q=nifi/success/Search Google
+ source id: 6638cd15-0aaf-4bba-a18a-722cf85038b6
+ source relationship names:
+ - success
+ destination id: 66649998-3b08-4b41-9f07-a51d999743f6
+ max work queue size: 0
+ max work queue data size: 0 MB
+ flowfile expiration: 0 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer
+Remote Process Groups:
+- id: 5b0a5c18-a02e-4a85-8080-0187ee679763
+ name: ''
+ url: http://nifi.minifi2:8080/nifi
+ comment: ''
+ timeout: 30 sec
+ yield period: 10 sec
+ transport protocol: HTTP
+ proxy host: squidnp.minifi
+ proxy port: 3128
+ proxy user: ''
+ proxy password: ''
+ Input Ports:
+ - id: da15f83d-0158-1000-7f8d-eb4d2652f870
+ name: response
+ comment: ''
+ max concurrent tasks: 1
+ use compression: false
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/94869042/minifi-bootstrap/src/test/resources/InvokeHttpMiNiFiProxyPasswordTemplateTest.yml
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/resources/InvokeHttpMiNiFiProxyPasswordTemplateTest.yml b/minifi-bootstrap/src/test/resources/InvokeHttpMiNiFiProxyPasswordTemplateTest.yml
new file mode 100644
index 0000000..348219b
--- /dev/null
+++ b/minifi-bootstrap/src/test/resources/InvokeHttpMiNiFiProxyPasswordTemplateTest.yml
@@ -0,0 +1,302 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+MiNiFi Config Version: 3
+Flow Controller:
+ name: InvokeHttpMiNiFiTemplateTest2
+ comment: ''
+Core Properties:
+ flow controller graceful shutdown period: 10 sec
+ flow service write delay interval: 500 ms
+ administrative yield duration: 30 sec
+ bored yield duration: 10 millis
+ max concurrent threads: 1
+FlowFile Repository:
+ partitions: 256
+ checkpoint interval: 2 mins
+ always sync: false
+ Swap:
+ threshold: 20000
+ in period: 5 sec
+ in threads: 1
+ out period: 5 sec
+ out threads: 4
+Content Repository:
+ content claim max appendable size: 10 MB
+ content claim max flow files: 100
+ always sync: false
+Provenance Repository:
+ provenance rollover time: 1 min
+Component Status Repository:
+ buffer size: 1440
+ snapshot frequency: 1 min
+Security Properties:
+ keystore: ''
+ keystore type: ''
+ keystore password: ''
+ key password: ''
+ truststore: ''
+ truststore type: ''
+ truststore password: ''
+ ssl protocol: ''
+ Sensitive Props:
+ key:
+ algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL
+ provider: BC
+Processors:
+- id: 0a73c5e4-7216-4cdf-9008-ace353478d55
+ name: LogAttribute
+ class: org.apache.nifi.processors.standard.LogAttribute
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 0 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list:
+ - success
+ Properties:
+ Attributes to Ignore:
+ Attributes to Log:
+ Log Level: info
+ Log Payload: 'false'
+ Log prefix:
+- id: 0bda7282-7a4f-4d39-83e5-ea86aa63f1dc
+ name: LogAttribute2
+ class: org.apache.nifi.processors.standard.LogAttribute
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 0 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list:
+ - success
+ Properties:
+ Attributes to Ignore:
+ Attributes to Log:
+ Log Level: info
+ Log Payload: 'false'
+ Log prefix:
+- id: dcff217a-a25c-424f-b9f1-d82c55bfa61b
+ name: Route On Status Code
+ class: org.apache.nifi.processors.standard.RouteOnAttribute
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 0 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list: []
+ Properties:
+ '200': ${invokehttp.status.code:equals(200)}
+ Routing Strategy: Route to Property name
+- id: 66649998-3b08-4b41-9f07-a51d999743f6
+ name: Search Google
+ class: org.apache.nifi.processors.standard.InvokeHTTP
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 0 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list: []
+ Properties:
+ Add Response Headers to Request: 'false'
+ Always Output Response: 'false'
+ Attributes to Send:
+ Basic Authentication Password:
+ Basic Authentication Username:
+ Connection Timeout: 5 secs
+ Content-Type: ${mime.type}
+ Digest Authentication: 'false'
+ Follow Redirects: 'True'
+ HTTP Method: GET
+ Include Date Header: 'True'
+ Max Length To Put In Attribute: '256'
+ Penalize on "No Retry": 'false'
+ Proxy Host:
+ Proxy Port:
+ Put Response Body In Attribute:
+ Read Timeout: 15 secs
+ Remote URL: http://www.google.com/search?q=${q}&rct=j
+ SSL Context Service:
+ Trusted Hostname:
+ Use Chunked Encoding: 'false'
+ invokehttp-proxy-password:
+ invokehttp-proxy-user:
+ send-message-body: 'true'
+- id: fcb378d7-6438-41d3-8485-7da85e870ca4
+ name: Source
+ class: org.apache.nifi.processors.standard.GenerateFlowFile
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 30 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list: []
+ Properties:
+ Batch Size: '1'
+ Data Format: Binary
+ File Size: 10 b
+ Unique FlowFiles: 'false'
+- id: 6638cd15-0aaf-4bba-a18a-722cf85038b6
+ name: q=nifi
+ class: org.apache.nifi.processors.attributes.UpdateAttribute
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 0 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list: []
+ Properties:
+ Delete Attributes Expression:
+ q: nifi
+Controller Services: []
+Process Groups: []
+Input Ports: []
+Output Ports: []
+Funnels: []
+Connections:
+- id: 3039718a-bb40-4811-9b74-ecbe926daae8
+ name: Route On Status Code/200/LogAttribute
+ source id: dcff217a-a25c-424f-b9f1-d82c55bfa61b
+ source relationship names:
+ - '200'
+ destination id: 0a73c5e4-7216-4cdf-9008-ace353478d55
+ max work queue size: 0
+ max work queue data size: 0 MB
+ flowfile expiration: 0 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer
+- id: 4a7bdaf1-eba0-4317-92f2-93cef6240c28
+ name: Route On Status Code/200/da15f83d-0158-1000-7f8d-eb4d2652f870
+ source id: dcff217a-a25c-424f-b9f1-d82c55bfa61b
+ source relationship names:
+ - '200'
+ destination id: da15f83d-0158-1000-7f8d-eb4d2652f870
+ max work queue size: 10000
+ max work queue data size: 1 GB
+ flowfile expiration: 0 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer
+- id: 9f948a25-afb4-4598-9a01-cfb7e3aa5920
+ name: Route On Status Code/unmatched/LogAttribute2
+ source id: dcff217a-a25c-424f-b9f1-d82c55bfa61b
+ source relationship names:
+ - unmatched
+ destination id: 0bda7282-7a4f-4d39-83e5-ea86aa63f1dc
+ max work queue size: 10000
+ max work queue data size: 1 GB
+ flowfile expiration: 0 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer
+- id: 7299c143-46f6-4a94-aa24-ec47db16f73e
+ name: Route On Status Code/unmatched/da15f83d-0158-1000-7f8d-eb4d2652f870
+ source id: dcff217a-a25c-424f-b9f1-d82c55bfa61b
+ source relationship names:
+ - unmatched
+ destination id: da15f83d-0158-1000-7f8d-eb4d2652f870
+ max work queue size: 10000
+ max work queue data size: 1 GB
+ flowfile expiration: 0 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer
+- id: 451b153a-8a4e-4bed-99ab-5159e499bcf1
+ name: Search Google/Failure/Route On Status Code
+ source id: 66649998-3b08-4b41-9f07-a51d999743f6
+ source relationship names:
+ - Failure
+ destination id: dcff217a-a25c-424f-b9f1-d82c55bfa61b
+ max work queue size: 10000
+ max work queue data size: 1 GB
+ flowfile expiration: 0 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer
+- id: ddbc1711-8cda-4418-b6ee-895763ef1a97
+ name: Search Google/No Retry/Route On Status Code
+ source id: 66649998-3b08-4b41-9f07-a51d999743f6
+ source relationship names:
+ - No Retry
+ destination id: dcff217a-a25c-424f-b9f1-d82c55bfa61b
+ max work queue size: 10000
+ max work queue data size: 1 GB
+ flowfile expiration: 0 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer
+- id: b97044aa-3401-49df-86ab-bc440cd8f6c8
+ name: Search Google/Original/Route On Status Code
+ source id: 66649998-3b08-4b41-9f07-a51d999743f6
+ source relationship names:
+ - Original
+ destination id: dcff217a-a25c-424f-b9f1-d82c55bfa61b
+ max work queue size: 10000
+ max work queue data size: 1 GB
+ flowfile expiration: 0 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer
+- id: c6a06d31-51fb-488f-bcde-495102f8c93f
+ name: Search Google/Response/Route On Status Code
+ source id: 66649998-3b08-4b41-9f07-a51d999743f6
+ source relationship names:
+ - Response
+ destination id: dcff217a-a25c-424f-b9f1-d82c55bfa61b
+ max work queue size: 10000
+ max work queue data size: 1 GB
+ flowfile expiration: 0 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer
+- id: 3172a4b6-c35b-4d2f-a7c1-68eb16cc1b37
+ name: Search Google/Retry/Route On Status Code
+ source id: 66649998-3b08-4b41-9f07-a51d999743f6
+ source relationship names:
+ - Retry
+ destination id: dcff217a-a25c-424f-b9f1-d82c55bfa61b
+ max work queue size: 10000
+ max work queue data size: 1 GB
+ flowfile expiration: 0 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer
+- id: 2ef71cd8-eb24-4653-8526-34ab84e79328
+ name: Source/success/q=nifi
+ source id: fcb378d7-6438-41d3-8485-7da85e870ca4
+ source relationship names:
+ - success
+ destination id: 6638cd15-0aaf-4bba-a18a-722cf85038b6
+ max work queue size: 0
+ max work queue data size: 0 MB
+ flowfile expiration: 0 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer
+- id: ee5ee5bf-a552-43cf-9514-c14d40350bff
+ name: q=nifi/success/Search Google
+ source id: 6638cd15-0aaf-4bba-a18a-722cf85038b6
+ source relationship names:
+ - success
+ destination id: 66649998-3b08-4b41-9f07-a51d999743f6
+ max work queue size: 0
+ max work queue data size: 0 MB
+ flowfile expiration: 0 sec
+ queue prioritizer class: org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer
+Remote Process Groups:
+- id: 5b0a5c18-a02e-4a85-8080-0187ee679763
+ name: ''
+ url: http://nifi.minifi2:8080/nifi
+ comment: ''
+ timeout: 30 sec
+ yield period: 10 sec
+ transport protocol: HTTP
+ proxy host: squidp.minifi
+ proxy port: 3128
+ proxy user: username
+ proxy password: password
+ Input Ports:
+ - id: da15f83d-0158-1000-7f8d-eb4d2652f870
+ name: response
+ comment: ''
+ max concurrent tasks: 1
+ use compression: false
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/94869042/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
index 7a9bf09..39e5a2f 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
@@ -18,18 +18,16 @@
package org.apache.nifi.minifi.commons.schema;
import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
+import org.apache.nifi.minifi.commons.schema.common.CollectionOverlap;
import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema;
import org.apache.nifi.minifi.commons.schema.common.StringUtil;
import org.apache.nifi.minifi.commons.schema.common.WritableSchema;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMPONENT_STATUS_REPO_KEY;
@@ -121,29 +119,24 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_OUTPUT_PORT_IDS, allOutputPortIds);
// Potential connection sources and destinations need to have unique ids
- OverlapResults<String> overlapResults = findOverlap(new HashSet<>(allProcessorIds), new HashSet<>(allRemoteInputPortIds), new HashSet<>(allInputPortIds), new HashSet<>(allOutputPortIds),
- new HashSet<>(allFunnelIds));
- if (overlapResults.duplicates.size() > 0) {
- addValidationIssue(FOUND_THE_FOLLOWING_DUPLICATE_IDS + overlapResults.duplicates.stream().sorted().collect(Collectors.joining(", ")));
+ CollectionOverlap<String> overlapResults = new CollectionOverlap<>(new HashSet<>(allProcessorIds), new HashSet<>(allRemoteInputPortIds), new HashSet<>(allInputPortIds),
+ new HashSet<>(allOutputPortIds), new HashSet<>(allFunnelIds));
+ if (overlapResults.getDuplicates().size() > 0) {
+ addValidationIssue(FOUND_THE_FOLLOWING_DUPLICATE_IDS + overlapResults.getDuplicates().stream().sorted().collect(Collectors.joining(", ")));
}
allConnectionSchemas.forEach(c -> {
String destinationId = c.getDestinationId();
- if (!StringUtil.isNullOrEmpty(destinationId) && !overlapResults.seen.contains(destinationId)) {
+ if (!StringUtil.isNullOrEmpty(destinationId) && !overlapResults.getElements().contains(destinationId)) {
addValidationIssue(CONNECTION_WITH_ID + c.getId() + HAS_INVALID_DESTINATION_ID + destinationId);
}
String sourceId = c.getSourceId();
- if (!StringUtil.isNullOrEmpty(sourceId) && !overlapResults.seen.contains(sourceId)) {
+ if (!StringUtil.isNullOrEmpty(sourceId) && !overlapResults.getElements().contains(sourceId)) {
addValidationIssue(CONNECTION_WITH_ID + c.getId() + HAS_INVALID_SOURCE_ID + sourceId);
}
});
}
- protected static <T> OverlapResults<T> findOverlap(Collection<T>... collections) {
- Set<T> seen = new HashSet<>();
- return new OverlapResults<>(seen, Arrays.stream(collections).flatMap(c -> c.stream()).sequential().filter(s -> !seen.add(s)).collect(Collectors.toSet()));
- }
-
public static List<ProcessGroupSchema> getAllProcessGroups(ProcessGroupSchema processGroupSchema) {
List<ProcessGroupSchema> result = new ArrayList<>();
addProcessGroups(processGroupSchema, result);
@@ -215,14 +208,4 @@ public class ConfigSchema extends BaseSchema implements WritableSchema, Converta
public ConfigSchema convert() {
return this;
}
-
- private static class OverlapResults<T> {
- private final Set<T> seen;
- private final Set<T> duplicates;
-
- private OverlapResults(Set<T> seen, Set<T> duplicates) {
- this.seen = seen;
- this.duplicates = duplicates;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/94869042/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessGroupSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessGroupSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessGroupSchema.java
index ccec389..e73ff73 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessGroupSchema.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessGroupSchema.java
@@ -18,6 +18,7 @@
package org.apache.nifi.minifi.commons.schema;
import org.apache.nifi.minifi.commons.schema.common.BaseSchemaWithIdAndName;
+import org.apache.nifi.minifi.commons.schema.common.StringUtil;
import java.util.List;
import java.util.Map;
@@ -30,22 +31,22 @@ public class RemoteProcessGroupSchema extends BaseSchemaWithIdAndName {
public static final String URL_KEY = "url";
public static final String TIMEOUT_KEY = "timeout";
public static final String TRANSPORT_PROTOCOL_KEY = "transport protocol";
+ public static final String S2S_PROXY_REQUIRES_HTTP = "Site-To-Site proxy support requires HTTP " + TRANSPORT_PROTOCOL_KEY;
+ public static final String PROXY_HOST_KEY = "proxy host";
+ public static final String PROXY_PORT_KEY = "proxy port";
+ public static final String PROXY_USER_KEY = "proxy user";
+ public static final String PROXY_PASSWORD_KEY = "proxy password";
- private enum transportProtocolOptions {
- RAW("RAW"), HTTP("HTTP");
+ public static final String EXPECTED_PROXY_HOST_IF_PROXY_PORT = "expected " + PROXY_HOST_KEY + " to be set if " + PROXY_PORT_KEY + " is";
+ public static final String EXPECTED_PROXY_HOST_IF_PROXY_USER = "expected " + PROXY_HOST_KEY + " to be set if " + PROXY_USER_KEY + " is";
+ public static final String EXPECTED_PROXY_USER_IF_PROXY_PASSWORD = "expected " + PROXY_USER_KEY + " to be set if " + PROXY_PASSWORD_KEY + " is";
+ public static final String EXPECTED_PROXY_PASSWORD_IF_PROXY_USER = "expected " + PROXY_PASSWORD_KEY + " to be set if " + PROXY_USER_KEY + " is";
- private final String stringValue;
-
- private transportProtocolOptions(final String s) {
- stringValue = s;
- }
-
- public String toString() {
- return stringValue;
- }
+ public enum TransportProtocolOptions {
+ RAW, HTTP;
public static boolean valid(String input) {
- return RAW.stringValue.equals(input) || HTTP.stringValue.equals(input);
+ return RAW.name().equals(input) || HTTP.name().equals(input);
}
}
@@ -53,6 +54,10 @@ public class RemoteProcessGroupSchema extends BaseSchemaWithIdAndName {
public static final String DEFAULT_TIMEOUT = "30 secs";
public static final String DEFAULT_YIELD_PERIOD = "10 sec";
public static final String DEFAULT_TRANSPORT_PROTOCOL= "RAW";
+ public static final String DEFAULT_PROXY_HOST = "";
+ public static final Integer DEFAULT_PROXY_PORT = null;
+ public static final String DEFAULT_PROXY_USER = "";
+ public static final String DEFAULT_PROXY_PASSWORD = "";
private String url;
private List<RemoteInputPortSchema> inputPorts;
@@ -61,6 +66,10 @@ public class RemoteProcessGroupSchema extends BaseSchemaWithIdAndName {
private String timeout = DEFAULT_TIMEOUT;
private String yieldPeriod = DEFAULT_YIELD_PERIOD;
private String transportProtocol = DEFAULT_TRANSPORT_PROTOCOL;
+ private String proxyHost = DEFAULT_PROXY_HOST;
+ private Integer proxyPort = DEFAULT_PROXY_PORT;
+ private String proxyUser = DEFAULT_PROXY_USER;
+ private String proxyPassword = DEFAULT_PROXY_PASSWORD;
public RemoteProcessGroupSchema(Map map) {
super(map, "RemoteProcessGroup(id: {id}, name: {name})");
@@ -78,9 +87,33 @@ public class RemoteProcessGroupSchema extends BaseSchemaWithIdAndName {
yieldPeriod = getOptionalKeyAsType(map, YIELD_PERIOD_KEY, String.class, wrapperName, DEFAULT_YIELD_PERIOD);
transportProtocol = getOptionalKeyAsType(map, TRANSPORT_PROTOCOL_KEY, String.class, wrapperName, DEFAULT_TRANSPORT_PROTOCOL);
- if (!transportProtocolOptions.valid(transportProtocol)){
+ if (!TransportProtocolOptions.valid(transportProtocol)){
addValidationIssue(TRANSPORT_PROTOCOL_KEY, wrapperName, "it must be either 'RAW' or 'HTTP' but is '" + transportProtocol + "'");
}
+
+ proxyHost = getOptionalKeyAsType(map, PROXY_HOST_KEY, String.class, wrapperName, DEFAULT_PROXY_HOST);
+ proxyPort = getOptionalKeyAsType(map, PROXY_PORT_KEY, Integer.class, wrapperName, DEFAULT_PROXY_PORT);
+ proxyUser = getOptionalKeyAsType(map, PROXY_USER_KEY, String.class, wrapperName, DEFAULT_PROXY_USER);
+ proxyPassword = getOptionalKeyAsType(map, PROXY_PASSWORD_KEY, String.class, wrapperName, DEFAULT_PROXY_PASSWORD);
+
+ if (StringUtil.isNullOrEmpty(proxyHost)) {
+ if (proxyPort != null) {
+ addValidationIssue(PROXY_PORT_KEY, wrapperName, EXPECTED_PROXY_HOST_IF_PROXY_PORT);
+ }
+ if (!StringUtil.isNullOrEmpty(proxyUser)) {
+ addValidationIssue(PROXY_USER_KEY, wrapperName, EXPECTED_PROXY_HOST_IF_PROXY_USER);
+ }
+ } else if (!TransportProtocolOptions.HTTP.name().equals(transportProtocol)) {
+ addValidationIssue(PROXY_HOST_KEY, wrapperName, S2S_PROXY_REQUIRES_HTTP);
+ }
+
+ if (StringUtil.isNullOrEmpty(proxyUser)) {
+ if (!StringUtil.isNullOrEmpty(proxyPassword)) {
+ addValidationIssue(PROXY_PASSWORD_KEY, wrapperName, EXPECTED_PROXY_USER_IF_PROXY_PASSWORD);
+ }
+ } else if (StringUtil.isNullOrEmpty(proxyPassword)) {
+ addValidationIssue(PROXY_USER_KEY, wrapperName, EXPECTED_PROXY_PASSWORD_IF_PROXY_USER);
+ }
}
@Override
@@ -91,6 +124,10 @@ public class RemoteProcessGroupSchema extends BaseSchemaWithIdAndName {
result.put(TIMEOUT_KEY, timeout);
result.put(YIELD_PERIOD_KEY, yieldPeriod);
result.put(TRANSPORT_PROTOCOL_KEY, transportProtocol);
+ result.put(PROXY_HOST_KEY, proxyHost);
+ result.put(PROXY_PORT_KEY, proxyPort == null ? "" : proxyPort);
+ result.put(PROXY_USER_KEY, proxyUser);
+ result.put(PROXY_PASSWORD_KEY, proxyPassword);
putListIfNotNull(result, INPUT_PORTS_KEY, inputPorts);
return result;
}
@@ -118,4 +155,40 @@ public class RemoteProcessGroupSchema extends BaseSchemaWithIdAndName {
public String getTransportProtocol() {
return transportProtocol;
}
+
+ public void setProxyHost(String proxyHost) {
+ this.proxyHost = proxyHost;
+ }
+
+ public void setProxyPort(Integer proxyPort) {
+ this.proxyPort = proxyPort;
+ }
+
+ public void setProxyUser(String proxyUser) {
+ this.proxyUser = proxyUser;
+ }
+
+ public void setProxyPassword(String proxyPassword) {
+ this.proxyPassword = proxyPassword;
+ }
+
+ public void setTransportProtocol(String transportProtocol) {
+ this.transportProtocol = transportProtocol;
+ }
+
+ public String getProxyHost() {
+ return proxyHost;
+ }
+
+ public Integer getProxyPort() {
+ return proxyPort;
+ }
+
+ public String getProxyUser() {
+ return proxyUser;
+ }
+
+ public String getProxyPassword() {
+ return proxyPassword;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/94869042/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchema.java
index 7cd82f7..919bba3 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchema.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchema.java
@@ -27,20 +27,15 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
-import java.util.regex.Pattern;
import java.util.stream.Collectors;
public abstract class BaseSchema implements Schema {
public static final String IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED = "it was not found and it is required";
public static final String EMPTY_NAME = "empty_name";
- public static final Pattern ID_REPLACE_PATTERN = Pattern.compile("[^A-Za-z0-9_-]");
-
protected final Supplier<Map<String, Object>> mapSupplier;
public BaseSchema() {
@@ -99,7 +94,7 @@ public abstract class BaseSchema implements Schema {
<T> T getKeyAsType(Map valueMap, String key, Class<T> targetClass, String wrapperName, boolean required, T defaultValue) {
Object value = valueMap.get(key);
- if (value == null) {
+ if (value == null || (targetClass != String.class && "".equals(value))) {
if (defaultValue != null) {
return defaultValue;
} else if(required) {
@@ -175,7 +170,7 @@ public abstract class BaseSchema implements Schema {
}
private <T> T interpretValueAsType(Object obj, String key, Class targetClass, String wrapperName, boolean required, boolean instantiateIfNull) {
- if (obj == null) {
+ if (obj == null || (targetClass != String.class && "".equals(obj))) {
if (required){
addValidationIssue(key, wrapperName, "it is a required property but was not found");
} else {
@@ -220,15 +215,9 @@ public abstract class BaseSchema implements Schema {
public static void checkForDuplicates(Consumer<String> duplicateMessageConsumer, String errorMessagePrefix, List<String> strings) {
if (strings != null) {
- Set<String> seen = new HashSet<>();
- Set<String> duplicates = new TreeSet<>();
- for (String string : strings) {
- if (!seen.add(string)) {
- duplicates.add(String.valueOf(string));
- }
- }
- if (duplicates.size() > 0) {
- duplicateMessageConsumer.accept(errorMessagePrefix + duplicates.stream().collect(Collectors.joining(", ")));
+ CollectionOverlap<String> collectionOverlap = new CollectionOverlap<>(strings);
+ if (collectionOverlap.getDuplicates().size() > 0) {
+ duplicateMessageConsumer.accept(errorMessagePrefix + collectionOverlap.getDuplicates().stream().collect(Collectors.joining(", ")));
}
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/94869042/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java
index 5c9f8c7..344200d 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java
@@ -23,7 +23,7 @@ import java.util.Map;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NAME_KEY;
-public abstract class BaseSchemaWithIdAndName extends BaseSchemaWithId implements WritableSchema {
+public class BaseSchemaWithIdAndName extends BaseSchemaWithId implements WritableSchema {
private String name;
public BaseSchemaWithIdAndName(Map map, String wrapperName) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/94869042/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CollectionOverlap.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CollectionOverlap.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CollectionOverlap.java
new file mode 100644
index 0000000..358c216
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CollectionOverlap.java
@@ -0,0 +1,56 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.nifi.minifi.commons.schema.common;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class CollectionOverlap<T> {
+ private final Set<T> elements;
+ private final Set<T> duplicates;
+
+ public CollectionOverlap(Collection<T>... collections) {
+ this(Arrays.stream(collections).map(c -> c.stream()));
+ }
+
+ public CollectionOverlap(Stream<T>... streams) {
+ this(Arrays.stream(streams).map(Function.identity()));
+ }
+
+ public CollectionOverlap(Stream<Stream<T>> streams) {
+ Set<T> elements = new HashSet<>();
+ this.duplicates = Collections.unmodifiableSet(streams.flatMap(Function.identity()).sequential().filter(s -> !elements.add(s)).collect(Collectors.toSet()));
+ this.elements = Collections.unmodifiableSet(elements);
+ }
+
+ public Set<T> getElements() {
+ return elements;
+ }
+
+ public Set<T> getDuplicates() {
+ return duplicates;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/94869042/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/v1/ConfigSchemaV1.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/v1/ConfigSchemaV1.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/v1/ConfigSchemaV1.java
index 37f34b1..1989b88 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/v1/ConfigSchemaV1.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/v1/ConfigSchemaV1.java
@@ -33,6 +33,7 @@ import org.apache.nifi.minifi.commons.schema.RemoteInputPortSchema;
import org.apache.nifi.minifi.commons.schema.RemoteProcessGroupSchema;
import org.apache.nifi.minifi.commons.schema.SecurityPropertiesSchema;
import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
+import org.apache.nifi.minifi.commons.schema.common.CollectionOverlap;
import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema;
import org.apache.nifi.minifi.commons.schema.common.StringUtil;
@@ -160,13 +161,7 @@ public class ConfigSchemaV1 extends BaseSchema implements ConvertableSchema<Conf
if (processors != null) {
processors.stream().forEachOrdered(p -> processorNameToIdMap.put(p.getName(), p.getId()));
-
- Set<String> processorNames = new HashSet<>();
- processors.stream().map(ProcessorSchema::getName).forEachOrdered(n -> {
- if (!processorNames.add(n)) {
- duplicateProcessorNames.add(n);
- }
- });
+ duplicateProcessorNames = new CollectionOverlap<>(processors.stream().map(ProcessorSchema::getName)).getDuplicates();
}
Set<String> remoteInputPortIds = new HashSet<>();
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/94869042/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/v2/ConfigSchemaV2.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/v2/ConfigSchemaV2.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/v2/ConfigSchemaV2.java
index 20a9da2..d73d8ae 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/v2/ConfigSchemaV2.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/v2/ConfigSchemaV2.java
@@ -33,18 +33,15 @@ import org.apache.nifi.minifi.commons.schema.RemoteInputPortSchema;
import org.apache.nifi.minifi.commons.schema.RemoteProcessGroupSchema;
import org.apache.nifi.minifi.commons.schema.SecurityPropertiesSchema;
import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
+import org.apache.nifi.minifi.commons.schema.common.CollectionOverlap;
import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema;
import org.apache.nifi.minifi.commons.schema.common.StringUtil;
-import org.apache.nifi.minifi.commons.schema.common.WritableSchema;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMPONENT_STATUS_REPO_KEY;
@@ -56,7 +53,7 @@ import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PR
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPO_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SECURITY_PROPS_KEY;
-public class ConfigSchemaV2 extends BaseSchema implements WritableSchema, ConvertableSchema<ConfigSchema> {
+public class ConfigSchemaV2 extends BaseSchema implements ConvertableSchema<ConfigSchema> {
public static final int CONFIG_VERSION = 2;
public static final String VERSION = "MiNiFi Config Version";
public static final String FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_INPUT_PORT_IDS = "Found the following duplicate remote input port ids: ";
@@ -71,6 +68,7 @@ public class ConfigSchemaV2 extends BaseSchema implements WritableSchema, Conver
public static final String FOUND_THE_FOLLOWING_DUPLICATE_FUNNEL_IDS = "Found the following duplicate funnel ids: ";
public static final String FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESS_GROUP_NAMES = "Found the following duplicate remote process group names: ";
public static String TOP_LEVEL_NAME = "top level";
+
private FlowControllerSchema flowControllerProperties;
private CorePropertiesSchema coreProperties;
private FlowFileRepositorySchema flowfileRepositoryProperties;
@@ -79,7 +77,6 @@ public class ConfigSchemaV2 extends BaseSchema implements WritableSchema, Conver
private SecurityPropertiesSchema securityProperties;
private ProcessGroupSchemaV2 processGroupSchema;
private ProvenanceReportingSchema provenanceReportingProperties;
-
private ProvenanceRepositorySchema provenanceRepositorySchema;
public ConfigSchemaV2(Map map) {
@@ -133,29 +130,24 @@ public class ConfigSchemaV2 extends BaseSchema implements WritableSchema, Conver
checkForDuplicates(this::addValidationIssue, FOUND_THE_FOLLOWING_DUPLICATE_OUTPUT_PORT_IDS, allOutputPortIds);
// Potential connection sources and destinations need to have unique ids
- OverlapResults<String> overlapResults = findOverlap(new HashSet<>(allProcessorIds), new HashSet<>(allRemoteInputPortIds), new HashSet<>(allInputPortIds), new HashSet<>(allOutputPortIds),
- new HashSet<>(allFunnelIds));
- if (overlapResults.duplicates.size() > 0) {
- addValidationIssue(FOUND_THE_FOLLOWING_DUPLICATE_IDS + overlapResults.duplicates.stream().sorted().collect(Collectors.joining(", ")));
+ CollectionOverlap<String> overlapResults = new CollectionOverlap<>(new HashSet<>(allProcessorIds), new HashSet<>(allRemoteInputPortIds), new HashSet<>(allInputPortIds),
+ new HashSet<>(allOutputPortIds), new HashSet<>(allFunnelIds));
+ if (overlapResults.getDuplicates().size() > 0) {
+ addValidationIssue(FOUND_THE_FOLLOWING_DUPLICATE_IDS + overlapResults.getDuplicates().stream().sorted().collect(Collectors.joining(", ")));
}
allConnectionSchemas.forEach(c -> {
String destinationId = c.getDestinationId();
- if (!StringUtil.isNullOrEmpty(destinationId) && !overlapResults.seen.contains(destinationId)) {
+ if (!StringUtil.isNullOrEmpty(destinationId) && !overlapResults.getElements().contains(destinationId)) {
addValidationIssue(CONNECTION_WITH_ID + c.getId() + HAS_INVALID_DESTINATION_ID + destinationId);
}
String sourceId = c.getSourceId();
- if (!StringUtil.isNullOrEmpty(sourceId) && !overlapResults.seen.contains(sourceId)) {
+ if (!StringUtil.isNullOrEmpty(sourceId) && !overlapResults.getElements().contains(sourceId)) {
addValidationIssue(CONNECTION_WITH_ID + c.getId() + HAS_INVALID_SOURCE_ID + sourceId);
}
});
}
- protected static <T> OverlapResults<T> findOverlap(Collection<T>... collections) {
- Set<T> seen = new HashSet<>();
- return new OverlapResults<>(seen, Arrays.stream(collections).flatMap(c -> c.stream()).sequential().filter(s -> !seen.add(s)).collect(Collectors.toSet()));
- }
-
public static List<ProcessGroupSchemaV2> getAllProcessGroups(ProcessGroupSchemaV2 processGroupSchema) {
List<ProcessGroupSchemaV2> result = new ArrayList<>();
addProcessGroups(processGroupSchema, result);
@@ -167,7 +159,13 @@ public class ConfigSchemaV2 extends BaseSchema implements WritableSchema, Conver
processGroupSchema.getProcessGroupSchemas().forEach(p -> addProcessGroups(p, result));
}
- public Map<String, Object> toMap() {
+ @Override
+ public int getVersion() {
+ return CONFIG_VERSION;
+ }
+
+ @Override
+ public ConfigSchema convert() {
Map<String, Object> result = mapSupplier.get();
result.put(VERSION, getVersion());
putIfNotNull(result, FLOW_CONTROLLER_PROPS_KEY, flowControllerProperties);
@@ -179,64 +177,6 @@ public class ConfigSchemaV2 extends BaseSchema implements WritableSchema, Conver
putIfNotNull(result, SECURITY_PROPS_KEY, securityProperties);
result.putAll(processGroupSchema.toMap());
putIfNotNull(result, PROVENANCE_REPORTING_KEY, provenanceReportingProperties);
- return result;
- }
-
- public FlowControllerSchema getFlowControllerProperties() {
- return flowControllerProperties;
- }
-
- public CorePropertiesSchema getCoreProperties() {
- return coreProperties;
- }
-
- public FlowFileRepositorySchema getFlowfileRepositoryProperties() {
- return flowfileRepositoryProperties;
- }
-
- public ContentRepositorySchema getContentRepositoryProperties() {
- return contentRepositoryProperties;
- }
-
- public SecurityPropertiesSchema getSecurityProperties() {
- return securityProperties;
- }
-
- public ProcessGroupSchemaV2 getProcessGroupSchema() {
- return processGroupSchema;
- }
-
- public ProvenanceReportingSchema getProvenanceReportingProperties() {
- return provenanceReportingProperties;
- }
-
- public ComponentStatusRepositorySchema getComponentStatusRepositoryProperties() {
- return componentStatusRepositoryProperties;
- }
-
- public ProvenanceRepositorySchema getProvenanceRepositorySchema() {
- return provenanceRepositorySchema;
- }
-
- @Override
- public int getVersion() {
- return CONFIG_VERSION;
- }
-
- @Override
- public ConfigSchema convert() {
- Map<String, Object> map = this.toMap();
- List<String> validationIssues = getValidationIssues();
- return new ConfigSchema(map, validationIssues);
- }
-
- private static class OverlapResults<T> {
- private final Set<T> seen;
- private final Set<T> duplicates;
-
- private OverlapResults(Set<T> seen, Set<T> duplicates) {
- this.seen = seen;
- this.duplicates = duplicates;
- }
+ return new ConfigSchema(result, getValidationIssues());
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/94869042/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/v2/ProcessGroupSchemaV2.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/v2/ProcessGroupSchemaV2.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/v2/ProcessGroupSchemaV2.java
index a9f0449..2e8559a 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/v2/ProcessGroupSchemaV2.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/v2/ProcessGroupSchemaV2.java
@@ -1,20 +1,18 @@
/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. See the NOTICE file distributed with
- * * this work for additional information regarding copyright ownership.
- * * The ASF licenses this file to You under the Apache License, Version 2.0
- * * (the "License"); you may not use this file except in compliance with
- * * the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.nifi.minifi.commons.schema.v2;
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/94869042/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/v2/RemoteProcessGroupSchemaV2.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/v2/RemoteProcessGroupSchemaV2.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/v2/RemoteProcessGroupSchemaV2.java
new file mode 100644
index 0000000..3e1b762
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/v2/RemoteProcessGroupSchemaV2.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.schema.v2;
+
+import org.apache.nifi.minifi.commons.schema.RemoteInputPortSchema;
+import org.apache.nifi.minifi.commons.schema.RemoteProcessGroupSchema;
+import org.apache.nifi.minifi.commons.schema.RemoteProcessGroupSchema.TransportProtocolOptions;
+import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
+import org.apache.nifi.minifi.commons.schema.common.BaseSchemaWithIdAndName;
+import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.nifi.minifi.commons.schema.RemoteProcessGroupSchema.DEFAULT_COMMENT;
+import static org.apache.nifi.minifi.commons.schema.RemoteProcessGroupSchema.DEFAULT_TIMEOUT;
+import static org.apache.nifi.minifi.commons.schema.RemoteProcessGroupSchema.DEFAULT_TRANSPORT_PROTOCOL;
+import static org.apache.nifi.minifi.commons.schema.RemoteProcessGroupSchema.DEFAULT_YIELD_PERIOD;
+import static org.apache.nifi.minifi.commons.schema.RemoteProcessGroupSchema.TIMEOUT_KEY;
+import static org.apache.nifi.minifi.commons.schema.RemoteProcessGroupSchema.TRANSPORT_PROTOCOL_KEY;
+import static org.apache.nifi.minifi.commons.schema.RemoteProcessGroupSchema.URL_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMMENT_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.INPUT_PORTS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.YIELD_PERIOD_KEY;
+
+public class RemoteProcessGroupSchemaV2 extends BaseSchema implements ConvertableSchema<RemoteProcessGroupSchema> {
+ private BaseSchemaWithIdAndName idAndName;
+ private String url;
+ private List<RemoteInputPortSchema> inputPorts;
+
+ private String comment = DEFAULT_COMMENT;
+ private String timeout = DEFAULT_TIMEOUT;
+ private String yieldPeriod = DEFAULT_YIELD_PERIOD;
+ private String transportProtocol = DEFAULT_TRANSPORT_PROTOCOL;
+
+ public RemoteProcessGroupSchemaV2(Map map) {
+ idAndName = new BaseSchemaWithIdAndName(map, "RemoteProcessGroup(id: {id}, name: {name})");
+
+ String wrapperName = idAndName.getWrapperName();
+ url = getRequiredKeyAsType(map, URL_KEY, String.class, wrapperName);
+ inputPorts = convertListToType(getRequiredKeyAsType(map, INPUT_PORTS_KEY, List.class, wrapperName), "input port", RemoteInputPortSchema.class, INPUT_PORTS_KEY);
+ if (inputPorts != null) {
+ for (RemoteInputPortSchema remoteInputPortSchema: inputPorts) {
+ addIssuesIfNotNull(remoteInputPortSchema);
+ }
+ }
+
+ comment = getOptionalKeyAsType(map, COMMENT_KEY, String.class, wrapperName, DEFAULT_COMMENT);
+ timeout = getOptionalKeyAsType(map, TIMEOUT_KEY, String.class, wrapperName, DEFAULT_TIMEOUT);
+ yieldPeriod = getOptionalKeyAsType(map, YIELD_PERIOD_KEY, String.class, wrapperName, DEFAULT_YIELD_PERIOD);
+ transportProtocol = getOptionalKeyAsType(map, TRANSPORT_PROTOCOL_KEY, String.class, wrapperName, DEFAULT_TRANSPORT_PROTOCOL);
+
+ if (!TransportProtocolOptions.valid(transportProtocol)){
+ addValidationIssue(TRANSPORT_PROTOCOL_KEY, wrapperName, "it must be either 'RAW' or 'HTTP' but is '" + transportProtocol + "'");
+ }
+ }
+
+ @Override
+ public RemoteProcessGroupSchema convert() {
+ Map<String, Object> result = idAndName.toMap();
+ result.put(URL_KEY, url);
+ result.put(COMMENT_KEY, comment);
+ result.put(TIMEOUT_KEY, timeout);
+ result.put(YIELD_PERIOD_KEY, yieldPeriod);
+ result.put(TRANSPORT_PROTOCOL_KEY, transportProtocol);
+ putListIfNotNull(result, INPUT_PORTS_KEY, inputPorts);
+ return new RemoteProcessGroupSchema(result);
+ }
+
+ @Override
+ public List<String> getValidationIssues() {
+ List<String> validationIssues = new ArrayList<>(idAndName.getValidationIssues());
+ validationIssues.addAll(super.getValidationIssues());
+ return validationIssues;
+ }
+
+ @Override
+ public int getVersion() {
+ return ConfigSchemaV2.CONFIG_VERSION;
+ }
+
+ public String getTransportProtocol() {
+ return transportProtocol;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/94869042/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/RemoteProcessGroupSchemaTest.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/RemoteProcessGroupSchemaTest.java b/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/RemoteProcessGroupSchemaTest.java
index aa7ef61..2339f40 100644
--- a/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/RemoteProcessGroupSchemaTest.java
+++ b/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/RemoteProcessGroupSchemaTest.java
@@ -19,6 +19,7 @@
package org.apache.nifi.minifi.commons.schema;
+import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
import org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys;
import org.junit.Test;
@@ -28,11 +29,18 @@ import java.util.Map;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
public class RemoteProcessGroupSchemaTest {
@Test
public void testNoPropertiesSet() {
- validateIssuesNumMatches(3, new RemoteProcessGroupSchema(new HashMap<>()));
+ RemoteProcessGroupSchema remoteProcessGroupSchema = new RemoteProcessGroupSchema(new HashMap<>());
+ validateIssuesNumMatches(3, remoteProcessGroupSchema);
+ assertEquals(RemoteProcessGroupSchema.DEFAULT_PROXY_HOST, remoteProcessGroupSchema.getProxyHost());
+ assertEquals(RemoteProcessGroupSchema.DEFAULT_PROXY_PORT, remoteProcessGroupSchema.getProxyPort());
+ assertEquals(RemoteProcessGroupSchema.DEFAULT_PROXY_USER, remoteProcessGroupSchema.getProxyUser());
+ assertEquals(RemoteProcessGroupSchema.DEFAULT_PROXY_PASSWORD, remoteProcessGroupSchema.getProxyPassword());
}
@Test
@@ -64,6 +72,52 @@ public class RemoteProcessGroupSchemaTest {
assertEquals(second.getTransportProtocol(), "HTTP");
}
+ @Test
+ public void testProxySettings() {
+ Map<String, Object> map = new HashMap<>();
+ map.put(RemoteProcessGroupSchema.PROXY_PORT_KEY, 1234);
+ map.put(RemoteProcessGroupSchema.PROXY_USER_KEY, "user");
+ RemoteProcessGroupSchema remoteProcessGroupSchema = new RemoteProcessGroupSchema(map);
+ assertTrue(remoteProcessGroupSchema.getValidationIssues().contains(BaseSchema.getIssueText(RemoteProcessGroupSchema.PROXY_PORT_KEY, remoteProcessGroupSchema.getWrapperName(),
+ RemoteProcessGroupSchema.EXPECTED_PROXY_HOST_IF_PROXY_PORT)));
+ assertTrue(remoteProcessGroupSchema.getValidationIssues().contains(BaseSchema.getIssueText(RemoteProcessGroupSchema.PROXY_USER_KEY, remoteProcessGroupSchema.getWrapperName(),
+ RemoteProcessGroupSchema.EXPECTED_PROXY_HOST_IF_PROXY_USER)));
+ map.remove(RemoteProcessGroupSchema.PROXY_USER_KEY);
+
+ map.put(RemoteProcessGroupSchema.PROXY_HOST_KEY, "host");
+ remoteProcessGroupSchema = new RemoteProcessGroupSchema(map);
+ assertFalse(remoteProcessGroupSchema.getValidationIssues().contains(BaseSchema.getIssueText(RemoteProcessGroupSchema.PROXY_PORT_KEY, remoteProcessGroupSchema.getWrapperName(),
+ RemoteProcessGroupSchema.EXPECTED_PROXY_HOST_IF_PROXY_PORT)));
+ assertTrue(remoteProcessGroupSchema.getValidationIssues().contains(BaseSchema.getIssueText(RemoteProcessGroupSchema.PROXY_HOST_KEY, remoteProcessGroupSchema.getWrapperName(),
+ RemoteProcessGroupSchema.S2S_PROXY_REQUIRES_HTTP)));
+
+ map.put(RemoteProcessGroupSchema.PROXY_PASSWORD_KEY, "password");
+ remoteProcessGroupSchema = new RemoteProcessGroupSchema(map);
+ assertTrue(remoteProcessGroupSchema.getValidationIssues().contains(BaseSchema.getIssueText(RemoteProcessGroupSchema.PROXY_PASSWORD_KEY, remoteProcessGroupSchema.getWrapperName(),
+ RemoteProcessGroupSchema.EXPECTED_PROXY_USER_IF_PROXY_PASSWORD)));
+
+ map.remove(RemoteProcessGroupSchema.PROXY_PASSWORD_KEY);
+ map.put(RemoteProcessGroupSchema.PROXY_USER_KEY, "user");
+ remoteProcessGroupSchema = new RemoteProcessGroupSchema(map);
+ assertTrue(remoteProcessGroupSchema.getValidationIssues().contains(BaseSchema.getIssueText(RemoteProcessGroupSchema.PROXY_USER_KEY, remoteProcessGroupSchema.getWrapperName(),
+ RemoteProcessGroupSchema.EXPECTED_PROXY_PASSWORD_IF_PROXY_USER)));
+
+ map.put(RemoteProcessGroupSchema.PROXY_PASSWORD_KEY, "password");
+ map.put(RemoteProcessGroupSchema.TRANSPORT_PROTOCOL_KEY, RemoteProcessGroupSchema.TransportProtocolOptions.HTTP.name());
+ remoteProcessGroupSchema = new RemoteProcessGroupSchema(map);
+ assertFalse(remoteProcessGroupSchema.getValidationIssues().contains(BaseSchema.getIssueText(RemoteProcessGroupSchema.PROXY_PASSWORD_KEY, remoteProcessGroupSchema.getWrapperName(),
+ RemoteProcessGroupSchema.EXPECTED_PROXY_USER_IF_PROXY_PASSWORD)));
+ assertFalse(remoteProcessGroupSchema.getValidationIssues().contains(BaseSchema.getIssueText(RemoteProcessGroupSchema.PROXY_USER_KEY, remoteProcessGroupSchema.getWrapperName(),
+ RemoteProcessGroupSchema.EXPECTED_PROXY_PASSWORD_IF_PROXY_USER)));
+ assertFalse(remoteProcessGroupSchema.getValidationIssues().contains(BaseSchema.getIssueText(RemoteProcessGroupSchema.PROXY_HOST_KEY, remoteProcessGroupSchema.getWrapperName(),
+ RemoteProcessGroupSchema.S2S_PROXY_REQUIRES_HTTP)));
+
+ assertEquals("host", remoteProcessGroupSchema.getProxyHost());
+ assertEquals(Integer.valueOf(1234), remoteProcessGroupSchema.getProxyPort());
+ assertEquals("user", remoteProcessGroupSchema.getProxyUser());
+ assertEquals("password", remoteProcessGroupSchema.getProxyPassword());
+ }
+
private PortSchema createPortSchema(String id, String name, String wrapperName) {
Map<String, Object> map = new HashMap<>();
map.put(CommonPropertyKeys.ID_KEY, id);
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/94869042/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/v2/RemoteProcessGroupSchemaV2Test.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/v2/RemoteProcessGroupSchemaV2Test.java b/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/v2/RemoteProcessGroupSchemaV2Test.java
new file mode 100644
index 0000000..5186d56
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/v2/RemoteProcessGroupSchemaV2Test.java
@@ -0,0 +1,82 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.nifi.minifi.commons.schema.v2;
+
+import org.apache.nifi.minifi.commons.schema.ConfigSchema;
+import org.apache.nifi.minifi.commons.schema.PortSchema;
+import org.apache.nifi.minifi.commons.schema.RemoteProcessGroupSchema;
+import org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+
+public class RemoteProcessGroupSchemaV2Test {
+ @Test
+ public void testNoPropertiesSet() {
+ validateIssuesNumMatches(3, new RemoteProcessGroupSchemaV2(new HashMap<>()));
+ }
+
+ @Test
+ public void testInputPortsRootGroup() {
+ Map<String, Object> map = new HashMap<>();
+ map.put(CommonPropertyKeys.INPUT_PORTS_KEY, Arrays.asList(createPortSchema("f94d2469-39f8-4f07-a0d8-acd9396f639e", "testName", ConfigSchema.TOP_LEVEL_NAME).toMap()));
+ map.put(RemoteProcessGroupSchema.URL_KEY, "http://localhost:8080/nifi");
+ map.put(CommonPropertyKeys.ID_KEY, "a58d2fab-7efe-4cb7-8224-12a60bd8003d");
+ validateIssuesNumMatches(0, new RemoteProcessGroupSchemaV2(map));
+ }
+
+ @Test
+ public void testTransportProtocol() {
+ Map<String, Object> map = new HashMap<>();
+ map.put(CommonPropertyKeys.INPUT_PORTS_KEY, Arrays.asList(createPortSchema("f94d2469-39f8-4f07-a0d8-acd9396f639e", "testName", ConfigSchema.TOP_LEVEL_NAME).toMap()));
+ map.put(RemoteProcessGroupSchema.URL_KEY, "http://localhost:8080/nifi");
+ map.put(CommonPropertyKeys.ID_KEY, "a58d2fab-7efe-4cb7-8224-12a60bd8003d");
+ map.put(RemoteProcessGroupSchema.TRANSPORT_PROTOCOL_KEY, "not valid");
+ validateIssuesNumMatches(1, new RemoteProcessGroupSchemaV2(map));
+
+ map.put(RemoteProcessGroupSchema.TRANSPORT_PROTOCOL_KEY, "RAW");
+ RemoteProcessGroupSchemaV2 first = new RemoteProcessGroupSchemaV2(map);
+ validateIssuesNumMatches(0,first);
+ assertEquals(first.getTransportProtocol(), "RAW");
+
+ map.put(RemoteProcessGroupSchema.TRANSPORT_PROTOCOL_KEY, "HTTP");
+ RemoteProcessGroupSchemaV2 second = new RemoteProcessGroupSchemaV2(map);
+ validateIssuesNumMatches(0, second);
+ assertEquals(second.getTransportProtocol(), "HTTP");
+ }
+
+ private PortSchema createPortSchema(String id, String name, String wrapperName) {
+ Map<String, Object> map = new HashMap<>();
+ map.put(CommonPropertyKeys.ID_KEY, id);
+ map.put(CommonPropertyKeys.NAME_KEY, name);
+ return new PortSchema(map, wrapperName);
+ }
+
+ private void validateIssuesNumMatches(int expected, RemoteProcessGroupSchemaV2 remoteProcessGroupSchema) {
+ int actual = remoteProcessGroupSchema.getValidationIssues().size();
+ String issues = "[" + System.lineSeparator() + remoteProcessGroupSchema.getValidationIssues().stream().collect(Collectors.joining("," + System.lineSeparator())) + "]";
+ assertEquals("Expected " + expected + " issue(s), got " + actual + ": " + issues, expected, actual);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/94869042/minifi-docs/src/main/markdown/System_Admin_Guide.md
----------------------------------------------------------------------
diff --git a/minifi-docs/src/main/markdown/System_Admin_Guide.md b/minifi-docs/src/main/markdown/System_Admin_Guide.md
index d95cd84..a4e0f31 100644
--- a/minifi-docs/src/main/markdown/System_Admin_Guide.md
+++ b/minifi-docs/src/main/markdown/System_Admin_Guide.md
@@ -349,6 +349,7 @@ parses and upconverts to the current version without issue.
### Version 2 -> Version 3 changes
1. Added support for Controller Services.
+2. Added support for Site-To-Site over proxy.
## Flow Controller
@@ -582,7 +583,7 @@ queue prioritizer class | This configuration option specifies the fully qualifi
## Remote Process Groups
-MiNiFi can be used to send data using the Site to Site protocol (via a Remote Process Group) or a Processor. These properties configure the Remote Process Groups that use Site-To-Site to send data to a core instance.
+MiNiFi can be used to send data using the Site to Site protocol (via a Remote Process Group) or a Processor. These properties configure the Remote Process Groups that use Site-To-Site to send data to a core instance. The proxy settings are for HTTP Site-To-Site.
*Property* | *Description*
-------------------| -------------
@@ -592,6 +593,10 @@ url | The URL of the core NiFi instance.
timeout | How long MiNiFi should wait before timing out the connection.
yield period | When communication with this Remote Process Group fails, it will not be scheduled again for this amount of time.
transport protocol | The transport protocol to use for this Remote Process Group. Can be either "RAW" or "HTTP"
+proxy host | The hostname of the proxy server
+proxy port | The port to connect to on the proxy server
+proxy user | The user name on the proxy server
+proxy password | The password for the proxy server
#### Input Ports Subsection
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/94869042/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/RemoteProcessGroupSchemaFunction.java
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/RemoteProcessGroupSchemaFunction.java b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/RemoteProcessGroupSchemaFunction.java
index 1dcc511..e7d5fa0 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/RemoteProcessGroupSchemaFunction.java
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/RemoteProcessGroupSchemaFunction.java
@@ -59,6 +59,10 @@ public class RemoteProcessGroupSchemaFunction implements Function<RemoteProcessG
map.put(RemoteProcessGroupSchema.TIMEOUT_KEY, remoteProcessGroupDTO.getCommunicationsTimeout());
map.put(CommonPropertyKeys.YIELD_PERIOD_KEY, remoteProcessGroupDTO.getYieldDuration());
map.put(RemoteProcessGroupSchema.TRANSPORT_PROTOCOL_KEY, remoteProcessGroupDTO.getTransportProtocol());
+ map.put(RemoteProcessGroupSchema.PROXY_HOST_KEY, remoteProcessGroupDTO.getProxyHost());
+ map.put(RemoteProcessGroupSchema.PROXY_PORT_KEY, remoteProcessGroupDTO.getProxyPort());
+ map.put(RemoteProcessGroupSchema.PROXY_USER_KEY, remoteProcessGroupDTO.getProxyUser());
+ map.put(RemoteProcessGroupSchema.PROXY_PASSWORD_KEY, remoteProcessGroupDTO.getProxyPassword());
return new RemoteProcessGroupSchema(map);
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/94869042/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java b/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java
index 597546f..ae5936c 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java
@@ -277,11 +277,12 @@ public class ConfigMainTest {
Map<String, Object> templateMap = ConfigMain.transformTemplateToSchema(getClass().getClassLoader().getResourceAsStream(name + ".xml")).toMap();
Map<String, Object> yamlMap = SchemaLoader.loadYamlAsMap(getClass().getClassLoader().getResourceAsStream(name + ".yml"));
assertNoMapDifferences(templateMap, yamlMap);
+ testV2YmlIfPresent(name, yamlMap);
testV1YmlIfPresent(name, yamlMap);
}
private InputStream upgradeAndReturn(String name) throws FileNotFoundException {
- InputStream yamlV1Stream = getClass().getClassLoader().getResourceAsStream(name + "-v1.yml");
+ InputStream yamlV1Stream = getClass().getClassLoader().getResourceAsStream(name);
if (yamlV1Stream == null) {
return null;
}
@@ -292,8 +293,30 @@ public class ConfigMainTest {
return new ByteArrayInputStream(outputStream.toByteArray());
}
+ private void testV2YmlIfPresent(String name, Map<String, Object> yamlMap) throws IOException, SchemaLoaderException {
+ InputStream upgradedInputStream = upgradeAndReturn(name + "-v2.yml");
+ if (upgradedInputStream != null) {
+ ConvertableSchema<ConfigSchema> configSchemaConvertableSchema = SchemaLoader.loadConvertableSchemaFromYaml(upgradedInputStream);
+ ConfigSchema configSchemaUpgradedFromV2 = configSchemaConvertableSchema.convert();
+
+ ConfigSchema configSchemaFromCurrent = new ConfigSchema(yamlMap);
+ ConfigSchema.getAllProcessGroups(configSchemaFromCurrent.getProcessGroupSchema()).stream().flatMap(p -> p.getRemoteProcessGroups().stream()).forEach(r -> {
+ clearProxyInfo(r);
+ });
+
+ assertNoMapDifferences(configSchemaUpgradedFromV2.toMap(), configSchemaFromCurrent.toMap());
+ }
+ }
+
+ private void clearProxyInfo(RemoteProcessGroupSchema remoteProcessGroupSchema) {
+ remoteProcessGroupSchema.setProxyHost(RemoteProcessGroupSchema.DEFAULT_PROXY_HOST);
+ remoteProcessGroupSchema.setProxyPort(RemoteProcessGroupSchema.DEFAULT_PROXY_PORT);
+ remoteProcessGroupSchema.setProxyUser(RemoteProcessGroupSchema.DEFAULT_PROXY_USER);
+ remoteProcessGroupSchema.setProxyPassword(RemoteProcessGroupSchema.DEFAULT_PROXY_PASSWORD);
+ }
+
private void testV1YmlIfPresent(String name, Map<String, Object> yamlMap) throws IOException, SchemaLoaderException {
- InputStream upgradedInputStream = upgradeAndReturn(name);
+ InputStream upgradedInputStream = upgradeAndReturn(name + "-v1.yml");
if (upgradedInputStream != null) {
ConvertableSchema<ConfigSchema> configSchemaConvertableSchema = SchemaLoader.loadConvertableSchemaFromYaml(upgradedInputStream);
ConfigSchema configSchemaUpgradedFromV1 = configSchemaConvertableSchema.convert();
@@ -341,6 +364,11 @@ public class ConfigMainTest {
v1Connection.setSourceId(v1IdToCurrentIdMap.get(v1Connection.getSourceId()));
v1Connection.setDestinationId(v1IdToCurrentIdMap.get(v1Connection.getDestinationId()));
}
+
+ ConfigSchema.getAllProcessGroups(configSchemaFromCurrent.getProcessGroupSchema()).stream().flatMap(p -> p.getRemoteProcessGroups().stream()).forEach(r -> {
+ clearProxyInfo(r);
+ r.setTransportProtocol(RemoteProcessGroupSchema.TransportProtocolOptions.RAW.name());
+ });
Map<String, Object> v1YamlMap = configSchemaUpgradedFromV1.toMap();
assertNoMapDifferences(v1YamlMap, configSchemaFromCurrent.toMap());
}