You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/07/05 19:18:37 UTC
[4/5] nifi-minifi git commit: MINIFI-41 - CLI utility for
template.xml -> YAML and YAML validation
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/schema/SecurityPropertiesSchemaTest.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/schema/SecurityPropertiesSchemaTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/schema/SecurityPropertiesSchemaTest.java
deleted file mode 100644
index ee889d2..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/schema/SecurityPropertiesSchemaTest.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.util.schema;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.HashMap;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class SecurityPropertiesSchemaTest {
- private SecurityPropertiesSchema securityPropertiesSchema;
-
- @Before
- public void setup() {
- securityPropertiesSchema = new SecurityPropertiesSchema(new HashMap());
- }
-
- @Test
- public void testKeystoreDefault() {
- assertEquals("", securityPropertiesSchema.getKeystore());
- }
-
- @Test
- public void testTruststoreDefault() {
- assertEquals("", securityPropertiesSchema.getTruststore());
- }
-
- @Test
- public void testSslProtocolDefault() {
- assertEquals("", securityPropertiesSchema.getSslProtocol());
- }
-
- @Test
- public void testKeystoreTypeDefault() {
- assertEquals("", securityPropertiesSchema.getKeystoreType());
- }
-
- @Test
- public void testKeyStorePasswdDefault() {
- assertEquals("", securityPropertiesSchema.getKeystorePassword());
- }
-
- @Test
- public void testKeyPasswordDefault() {
- assertEquals("", securityPropertiesSchema.getKeyPassword());
- }
-
- @Test
- public void testTruststoreTypeDefault() {
- assertEquals("", securityPropertiesSchema.getTruststoreType());
- }
-
- @Test
- public void testTruststorePasswdDefault() {
- assertEquals("", securityPropertiesSchema.getTruststorePassword());
- }
-
- @Test
- public void testEmptyMapConstructorValid() {
- assertTrue(securityPropertiesSchema.isValid());
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/pom.xml b/minifi-commons/minifi-commons-schema/pom.xml
new file mode 100644
index 0000000..c2310dd
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/pom.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi.minifi</groupId>
+ <artifactId>minifi-commons</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>minifi-commons-schema</artifactId>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.yaml</groupId>
+ <artifactId>snakeyaml</artifactId>
+ <version>1.17</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ComponentStatusRepositorySchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ComponentStatusRepositorySchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ComponentStatusRepositorySchema.java
new file mode 100644
index 0000000..02f3a78
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ComponentStatusRepositorySchema.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.schema;
+
+import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
+
+import java.util.Map;
+
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMPONENT_STATUS_REPO_KEY;
+
+public class ComponentStatusRepositorySchema extends BaseSchema {
+ public static final String BUFFER_SIZE_KEY = "buffer size";
+ public static final String SNAPSHOT_FREQUENCY_KEY = "snapshot frequency";
+
+ public static final int DEFAULT_BUFFER_SIZE = 1440;
+ public static final String DEFAULT_SNAPSHOT_FREQUENCY = "1 min";
+
+ private Number bufferSize = DEFAULT_BUFFER_SIZE;
+ private String snapshotFrequency = DEFAULT_SNAPSHOT_FREQUENCY;
+
+ public ComponentStatusRepositorySchema() {
+ }
+
+ public ComponentStatusRepositorySchema(Map map) {
+ bufferSize = getOptionalKeyAsType(map, BUFFER_SIZE_KEY, Number.class, COMPONENT_STATUS_REPO_KEY, DEFAULT_BUFFER_SIZE);
+ snapshotFrequency = getOptionalKeyAsType(map, SNAPSHOT_FREQUENCY_KEY, String.class, COMPONENT_STATUS_REPO_KEY, DEFAULT_SNAPSHOT_FREQUENCY);
+ }
+
+ @Override
+ public Map<String, Object> toMap() {
+ Map<String, Object> result = mapSupplier.get();
+ result.put(BUFFER_SIZE_KEY, bufferSize);
+ result.put(SNAPSHOT_FREQUENCY_KEY, snapshotFrequency);
+ return result;
+ }
+
+ public Number getBufferSize() {
+ return bufferSize;
+ }
+
+ public String getSnapshotFrequency() {
+ return snapshotFrequency;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
new file mode 100644
index 0000000..57f9124
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConfigSchema.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.schema;
+
+import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMPONENT_STATUS_REPO_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CONNECTIONS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CONTENT_REPO_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CORE_PROPS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.FLOWFILE_REPO_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.FLOW_CONTROLLER_PROPS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROCESSORS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPORTING_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPO_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.REMOTE_PROCESSING_GROUPS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SECURITY_PROPS_KEY;
+
+/**
+ *
+ */
+public class ConfigSchema extends BaseSchema {
+ public static final String FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_NAMES = "Found the following duplicate processor names: ";
+ public static final String FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_NAMES = "Found the following duplicate connection names: ";
+ public static final String FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES = "Found the following duplicate remote processing group names: ";
+ public static String TOP_LEVEL_NAME = "top level";
+
+ private FlowControllerSchema flowControllerProperties;
+ private CorePropertiesSchema coreProperties;
+ private FlowFileRepositorySchema flowfileRepositoryProperties;
+ private ContentRepositorySchema contentRepositoryProperties;
+ private ComponentStatusRepositorySchema componentStatusRepositoryProperties;
+ private SecurityPropertiesSchema securityProperties;
+ private List<ProcessorSchema> processors;
+ private List<ConnectionSchema> connections;
+ private List<RemoteProcessingGroupSchema> remoteProcessingGroups;
+ private ProvenanceReportingSchema provenanceReportingProperties;
+
+ private ProvenanceRepositorySchema provenanceRepositorySchema;
+
+ public ConfigSchema(Map map) {
+ flowControllerProperties = getMapAsType(map, FLOW_CONTROLLER_PROPS_KEY, FlowControllerSchema.class, TOP_LEVEL_NAME, true);
+
+ coreProperties = getMapAsType(map, CORE_PROPS_KEY, CorePropertiesSchema.class, TOP_LEVEL_NAME, false);
+ flowfileRepositoryProperties = getMapAsType(map, FLOWFILE_REPO_KEY, FlowFileRepositorySchema.class, TOP_LEVEL_NAME, false);
+ contentRepositoryProperties = getMapAsType(map, CONTENT_REPO_KEY, ContentRepositorySchema.class, TOP_LEVEL_NAME, false);
+ provenanceRepositorySchema = getMapAsType(map, PROVENANCE_REPO_KEY, ProvenanceRepositorySchema.class, TOP_LEVEL_NAME, false);
+ componentStatusRepositoryProperties = getMapAsType(map, COMPONENT_STATUS_REPO_KEY, ComponentStatusRepositorySchema.class, TOP_LEVEL_NAME, false);
+ securityProperties = getMapAsType(map, SECURITY_PROPS_KEY, SecurityPropertiesSchema.class, TOP_LEVEL_NAME, false);
+
+ processors = getOptionalKeyAsType(map, PROCESSORS_KEY, List.class, TOP_LEVEL_NAME, null);
+ if (processors != null) {
+ transformListToType(processors, "processor", ProcessorSchema.class, PROCESSORS_KEY);
+ }
+
+ connections = getOptionalKeyAsType(map, CONNECTIONS_KEY, List.class, TOP_LEVEL_NAME, null);
+ if (connections != null) {
+ transformListToType(connections, "connection", ConnectionSchema.class, CONNECTIONS_KEY);
+ }
+
+ remoteProcessingGroups = getOptionalKeyAsType(map, REMOTE_PROCESSING_GROUPS_KEY, List.class, TOP_LEVEL_NAME, null);
+ if (remoteProcessingGroups != null) {
+ transformListToType(remoteProcessingGroups, "remote processing group", RemoteProcessingGroupSchema.class, REMOTE_PROCESSING_GROUPS_KEY);
+ }
+
+ provenanceReportingProperties = getMapAsType(map, PROVENANCE_REPORTING_KEY, ProvenanceReportingSchema.class, TOP_LEVEL_NAME, false, false);
+
+ addIssuesIfNotNull(flowControllerProperties);
+ addIssuesIfNotNull(coreProperties);
+ addIssuesIfNotNull(flowfileRepositoryProperties);
+ addIssuesIfNotNull(contentRepositoryProperties);
+ addIssuesIfNotNull(componentStatusRepositoryProperties);
+ addIssuesIfNotNull(securityProperties);
+ addIssuesIfNotNull(provenanceReportingProperties);
+ addIssuesIfNotNull(provenanceRepositorySchema);
+
+ if (processors != null) {
+ checkForDuplicateNames(FOUND_THE_FOLLOWING_DUPLICATE_PROCESSOR_NAMES, processors.stream().map(ProcessorSchema::getName).collect(Collectors.toList()));
+ for (ProcessorSchema processorSchema : processors) {
+ addIssuesIfNotNull(processorSchema);
+ }
+ }
+
+ if (connections != null) {
+ checkForDuplicateNames(FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_NAMES, connections.stream().map(ConnectionSchema::getName).collect(Collectors.toList()));
+ for (ConnectionSchema connectionSchema : connections) {
+ addIssuesIfNotNull(connectionSchema);
+ }
+ }
+
+ if (remoteProcessingGroups != null) {
+ checkForDuplicateNames(FOUND_THE_FOLLOWING_DUPLICATE_REMOTE_PROCESSING_GROUP_NAMES, remoteProcessingGroups.stream().map(RemoteProcessingGroupSchema::getName).collect(Collectors.toList()));
+ for (RemoteProcessingGroupSchema remoteProcessingGroupSchema : remoteProcessingGroups) {
+ addIssuesIfNotNull(remoteProcessingGroupSchema);
+ }
+ }
+ }
+
+ private void checkForDuplicateNames(String errorMessagePrefix, List<String> names) {
+ if (processors != null) {
+ Set<String> seenNames = new HashSet<>();
+ Set<String> duplicateNames = new TreeSet<>();
+ for (String name : names) {
+ if (!seenNames.add(name)) {
+ duplicateNames.add(name);
+ }
+ }
+ if (duplicateNames.size() > 0) {
+ StringBuilder errorMessage = new StringBuilder(errorMessagePrefix);
+ for (String duplicateName : duplicateNames) {
+ errorMessage.append(duplicateName);
+ errorMessage.append(", ");
+ }
+ errorMessage.setLength(errorMessage.length() - 2);
+ validationIssues.add(errorMessage.toString());
+ }
+ }
+ }
+
+ public Map<String, Object> toMap() {
+ Map<String, Object> result = mapSupplier.get();
+ result.put(FLOW_CONTROLLER_PROPS_KEY, flowControllerProperties.toMap());
+ putIfNotNull(result, CORE_PROPS_KEY, coreProperties);
+ putIfNotNull(result, FLOWFILE_REPO_KEY, flowfileRepositoryProperties);
+ putIfNotNull(result, CONTENT_REPO_KEY, contentRepositoryProperties);
+ putIfNotNull(result, PROVENANCE_REPO_KEY, provenanceRepositorySchema);
+ putIfNotNull(result, COMPONENT_STATUS_REPO_KEY, componentStatusRepositoryProperties);
+ putIfNotNull(result, SECURITY_PROPS_KEY, securityProperties);
+ putListIfNotNull(result, PROCESSORS_KEY, processors);
+ putListIfNotNull(result, CONNECTIONS_KEY, connections);
+ putListIfNotNull(result, REMOTE_PROCESSING_GROUPS_KEY, remoteProcessingGroups);
+ putIfNotNull(result, PROVENANCE_REPORTING_KEY, provenanceReportingProperties);
+ return result;
+ }
+
+ public FlowControllerSchema getFlowControllerProperties() {
+ return flowControllerProperties;
+ }
+
+ public CorePropertiesSchema getCoreProperties() {
+ return coreProperties;
+ }
+
+ public FlowFileRepositorySchema getFlowfileRepositoryProperties() {
+ return flowfileRepositoryProperties;
+ }
+
+ public ContentRepositorySchema getContentRepositoryProperties() {
+ return contentRepositoryProperties;
+ }
+
+ public SecurityPropertiesSchema getSecurityProperties() {
+ return securityProperties;
+ }
+
+ public List<ProcessorSchema> getProcessors() {
+ return processors;
+ }
+
+ public List<ConnectionSchema> getConnections() {
+ return connections;
+ }
+
+ public List<RemoteProcessingGroupSchema> getRemoteProcessingGroups() {
+ return remoteProcessingGroups;
+ }
+
+ public ProvenanceReportingSchema getProvenanceReportingProperties() {
+ return provenanceReportingProperties;
+ }
+
+ public ComponentStatusRepositorySchema getComponentStatusRepositoryProperties() {
+ return componentStatusRepositoryProperties;
+ }
+
+ public ProvenanceRepositorySchema getProvenanceRepositorySchema() {
+ return provenanceRepositorySchema;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConnectionSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConnectionSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConnectionSchema.java
new file mode 100644
index 0000000..34bd61f
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ConnectionSchema.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.schema;
+
+import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
+
+import java.util.Map;
+
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CONNECTIONS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NAME_KEY;
+
+/**
+ *
+ */
+public class ConnectionSchema extends BaseSchema {
+ public static final String SOURCE_NAME_KEY = "source name";
+ public static final String SOURCE_RELATIONSHIP_NAME_KEY = "source relationship name";
+ public static final String DESTINATION_NAME_KEY = "destination name";
+ public static final String MAX_WORK_QUEUE_SIZE_KEY = "max work queue size";
+ public static final String MAX_WORK_QUEUE_DATA_SIZE_KEY = "max work queue data size";
+ public static final String FLOWFILE_EXPIRATION__KEY = "flowfile expiration";
+ public static final String QUEUE_PRIORITIZER_CLASS_KEY = "queue prioritizer class";
+
+ public static final long DEFAULT_MAX_WORK_QUEUE_SIZE = 0;
+ public static final String DEFAULT_MAX_QUEUE_DATA_SIZE = "0 MB";
+ public static final String DEFAULT_FLOWFILE_EXPIRATION = "0 sec";
+
+ private String name;
+ private String sourceName;
+ private String sourceRelationshipName;
+ private String destinationName;
+
+ private Number maxWorkQueueSize = DEFAULT_MAX_WORK_QUEUE_SIZE;
+ private String maxWorkQueueDataSize = DEFAULT_MAX_QUEUE_DATA_SIZE;
+ private String flowfileExpiration = DEFAULT_FLOWFILE_EXPIRATION;
+ private String queuePrioritizerClass;
+
+ public ConnectionSchema(Map map) {
+ name = getRequiredKeyAsType(map, NAME_KEY, String.class, CONNECTIONS_KEY);
+ sourceName = getRequiredKeyAsType(map, SOURCE_NAME_KEY, String.class, CONNECTIONS_KEY);
+ sourceRelationshipName = getRequiredKeyAsType(map, SOURCE_RELATIONSHIP_NAME_KEY, String.class, CONNECTIONS_KEY);
+ destinationName = getRequiredKeyAsType(map, DESTINATION_NAME_KEY, String.class, CONNECTIONS_KEY);
+
+ maxWorkQueueSize = getOptionalKeyAsType(map, MAX_WORK_QUEUE_SIZE_KEY, Number.class, CONNECTIONS_KEY, DEFAULT_MAX_WORK_QUEUE_SIZE);
+ maxWorkQueueDataSize = getOptionalKeyAsType(map, MAX_WORK_QUEUE_DATA_SIZE_KEY, String.class, CONNECTIONS_KEY, DEFAULT_MAX_QUEUE_DATA_SIZE);
+ flowfileExpiration = getOptionalKeyAsType(map, FLOWFILE_EXPIRATION__KEY, String.class, CONNECTIONS_KEY, DEFAULT_FLOWFILE_EXPIRATION);
+ queuePrioritizerClass = getOptionalKeyAsType(map, QUEUE_PRIORITIZER_CLASS_KEY, String.class, CONNECTIONS_KEY, "");
+ }
+
+ @Override
+ public Map<String, Object> toMap() {
+ Map<String, Object> result = mapSupplier.get();
+ result.put(NAME_KEY, name);
+ result.put(SOURCE_NAME_KEY, sourceName);
+ result.put(SOURCE_RELATIONSHIP_NAME_KEY, sourceRelationshipName);
+ result.put(DESTINATION_NAME_KEY, destinationName);
+
+ result.put(MAX_WORK_QUEUE_SIZE_KEY, maxWorkQueueSize);
+ result.put(MAX_WORK_QUEUE_DATA_SIZE_KEY, maxWorkQueueDataSize);
+ result.put(FLOWFILE_EXPIRATION__KEY, flowfileExpiration);
+ result.put(QUEUE_PRIORITIZER_CLASS_KEY, queuePrioritizerClass);
+ return result;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getSourceName() {
+ return sourceName;
+ }
+
+ public String getSourceRelationshipName() {
+ return sourceRelationshipName;
+ }
+
+ public String getDestinationName() {
+ return destinationName;
+ }
+
+ public Number getMaxWorkQueueSize() {
+ return maxWorkQueueSize;
+ }
+
+ public String getMaxWorkQueueDataSize() {
+ return maxWorkQueueDataSize;
+ }
+
+ public String getFlowfileExpiration() {
+ return flowfileExpiration;
+ }
+
+ public String getQueuePrioritizerClass() {
+ return queuePrioritizerClass;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ContentRepositorySchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ContentRepositorySchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ContentRepositorySchema.java
new file mode 100644
index 0000000..868cb79
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ContentRepositorySchema.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.schema;
+
+import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
+
+import java.util.Map;
+
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.ALWAYS_SYNC_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CONTENT_REPO_KEY;
+
+/**
+ *
+ */
+public class ContentRepositorySchema extends BaseSchema {
+ public static final String CONTENT_CLAIM_MAX_APPENDABLE_SIZE_KEY = "content claim max appendable size";
+ public static final String CONTENT_CLAIM_MAX_FLOW_FILES_KEY = "content claim max flow files";
+
+ public static final String DEFAULT_CONTENT_CLAIM_MAX_APPENDABLE_SIZE = "10 MB";
+ public static final int DEFAULT_CONTENT_CLAIM_MAX_FLOW_FILES = 100;
+ public static final boolean DEFAULT_ALWAYS_SYNC = false;
+
+ private String contentClaimMaxAppendableSize = DEFAULT_CONTENT_CLAIM_MAX_APPENDABLE_SIZE;
+ private Number contentClaimMaxFlowFiles = DEFAULT_CONTENT_CLAIM_MAX_FLOW_FILES;
+ private Boolean alwaysSync = DEFAULT_ALWAYS_SYNC;
+
+ public ContentRepositorySchema() {
+ }
+
+ public ContentRepositorySchema(Map map) {
+ contentClaimMaxAppendableSize = getOptionalKeyAsType(map, CONTENT_CLAIM_MAX_APPENDABLE_SIZE_KEY, String.class,
+ CONTENT_REPO_KEY, DEFAULT_CONTENT_CLAIM_MAX_APPENDABLE_SIZE);
+ contentClaimMaxFlowFiles = getOptionalKeyAsType(map, CONTENT_CLAIM_MAX_FLOW_FILES_KEY, Number.class,
+ CONTENT_REPO_KEY, DEFAULT_CONTENT_CLAIM_MAX_FLOW_FILES);
+ alwaysSync = getOptionalKeyAsType(map, ALWAYS_SYNC_KEY, Boolean.class, CONTENT_REPO_KEY, DEFAULT_ALWAYS_SYNC);
+ }
+
+ @Override
+ public Map<String, Object> toMap() {
+ Map<String, Object> result = mapSupplier.get();
+ result.put(CONTENT_CLAIM_MAX_APPENDABLE_SIZE_KEY, contentClaimMaxAppendableSize);
+ result.put(CONTENT_CLAIM_MAX_FLOW_FILES_KEY, contentClaimMaxFlowFiles);
+ result.put(ALWAYS_SYNC_KEY, alwaysSync);
+ return result;
+ }
+
+ public String getContentClaimMaxAppendableSize() {
+ return contentClaimMaxAppendableSize;
+ }
+
+ public Number getContentClaimMaxFlowFiles() {
+ return contentClaimMaxFlowFiles;
+ }
+
+ public boolean getAlwaysSync() {
+ return alwaysSync;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/CorePropertiesSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/CorePropertiesSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/CorePropertiesSchema.java
new file mode 100644
index 0000000..ce30d9c
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/CorePropertiesSchema.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.schema;
+
+import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
+
+import java.util.Map;
+
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CORE_PROPS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.MAX_CONCURRENT_THREADS_KEY;
+
+/**
+ *
+ */
+public class CorePropertiesSchema extends BaseSchema {
+
+ public static final String FLOW_CONTROLLER_SHUTDOWN_PERIOD_KEY = "flow controller graceful shutdown period";
+ public static final String FLOW_SERVICE_WRITE_DELAY_INTERVAL_KEY = "flow service write delay interval";
+ public static final String ADMINISTRATIVE_YIELD_DURATION_KEY = "administrative yield duration";
+ public static final String BORED_YIELD_DURATION_KEY = "bored yield duration";
+
+ public static final String DEFAULT_FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD = "10 sec";
+ public static final String DEFAULT_FLOW_SERVICE_WRITE_DELAY_INTERVAL = "500 ms";
+ public static final String DEFAULT_ADMINISTRATIVE_YIELD_DURATION = "30 sec";
+ public static final String DEFAULT_BORED_YIELD_DURATION = "10 millis";
+ public static final int DEFAULT_MAX_CONCURRENT_THREADS = 1;
+
+ private String flowControllerGracefulShutdownPeriod = DEFAULT_FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD;
+ private String flowServiceWriteDelayInterval = DEFAULT_FLOW_SERVICE_WRITE_DELAY_INTERVAL;
+ private String administrativeYieldDuration = DEFAULT_ADMINISTRATIVE_YIELD_DURATION;
+ private String boredYieldDuration = DEFAULT_BORED_YIELD_DURATION;
+ private Number maxConcurrentThreads = DEFAULT_MAX_CONCURRENT_THREADS;
+
+ public CorePropertiesSchema() {
+ }
+
+ public CorePropertiesSchema(Map map) {
+ flowControllerGracefulShutdownPeriod = getOptionalKeyAsType(map, FLOW_CONTROLLER_SHUTDOWN_PERIOD_KEY, String.class,
+ CORE_PROPS_KEY, DEFAULT_FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD);
+ flowServiceWriteDelayInterval = getOptionalKeyAsType(map, FLOW_SERVICE_WRITE_DELAY_INTERVAL_KEY, String.class,
+ CORE_PROPS_KEY, DEFAULT_FLOW_SERVICE_WRITE_DELAY_INTERVAL);
+ administrativeYieldDuration = getOptionalKeyAsType(map, ADMINISTRATIVE_YIELD_DURATION_KEY, String.class,
+ CORE_PROPS_KEY, DEFAULT_ADMINISTRATIVE_YIELD_DURATION);
+ boredYieldDuration = getOptionalKeyAsType(map, BORED_YIELD_DURATION_KEY, String.class, CORE_PROPS_KEY, DEFAULT_BORED_YIELD_DURATION);
+ maxConcurrentThreads = getOptionalKeyAsType(map, MAX_CONCURRENT_THREADS_KEY, Number.class,
+ CORE_PROPS_KEY, DEFAULT_MAX_CONCURRENT_THREADS);
+ }
+
+ @Override
+ public Map<String, Object> toMap() {
+ Map<String, Object> result = mapSupplier.get();
+ result.put(FLOW_CONTROLLER_SHUTDOWN_PERIOD_KEY, flowControllerGracefulShutdownPeriod);
+ result.put(FLOW_SERVICE_WRITE_DELAY_INTERVAL_KEY, flowServiceWriteDelayInterval);
+ result.put(ADMINISTRATIVE_YIELD_DURATION_KEY, administrativeYieldDuration);
+ result.put(BORED_YIELD_DURATION_KEY, boredYieldDuration);
+ result.put(MAX_CONCURRENT_THREADS_KEY, maxConcurrentThreads);
+ return result;
+ }
+
+ public String getFlowControllerGracefulShutdownPeriod() {
+ return flowControllerGracefulShutdownPeriod;
+ }
+
+ public String getFlowServiceWriteDelayInterval() {
+ return flowServiceWriteDelayInterval;
+ }
+
+ public String getAdministrativeYieldDuration() {
+ return administrativeYieldDuration;
+ }
+
+ public String getBoredYieldDuration() {
+ return boredYieldDuration;
+ }
+
+ public Number getMaxConcurrentThreads() {
+ return maxConcurrentThreads;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FlowControllerSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FlowControllerSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FlowControllerSchema.java
new file mode 100644
index 0000000..3306029
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FlowControllerSchema.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.schema;
+
+import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
+
+import java.util.Map;
+
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMMENT_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.FLOW_CONTROLLER_PROPS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NAME_KEY;
+
+/**
+ *
+ */
+public class FlowControllerSchema extends BaseSchema {
+ private String name;
+ private String comment;
+
+ public FlowControllerSchema(Map map) {
+ name = getRequiredKeyAsType(map, NAME_KEY, String.class, FLOW_CONTROLLER_PROPS_KEY);
+ comment = getOptionalKeyAsType(map, COMMENT_KEY, String.class, FLOW_CONTROLLER_PROPS_KEY, "");
+ }
+
+ @Override
+ public Map<String, Object> toMap() {
+ Map<String, Object> result = mapSupplier.get();
+ result.put(NAME_KEY, name);
+ result.put(COMMENT_KEY, comment);
+ return result;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getComment() {
+ return comment;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FlowFileRepositorySchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FlowFileRepositorySchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FlowFileRepositorySchema.java
new file mode 100644
index 0000000..cd7f456
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/FlowFileRepositorySchema.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.schema;
+
+import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
+
+import java.util.Map;
+
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.ALWAYS_SYNC_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.FLOWFILE_REPO_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SWAP_PROPS_KEY;
+
+/**
+ *
+ */
+public class FlowFileRepositorySchema extends BaseSchema {
+ public static final String PARTITIONS_KEY = "partitions";
+ public static final String CHECKPOINT_INTERVAL_KEY = "checkpoint interval";
+ public static final int DEFAULT_PARTITIONS = 256;
+ public static final String DEFAULT_CHECKPOINT_INTERVAL = "2 mins";
+ public static final boolean DEFAULT_ALWAYS_SYNC = false;
+
+ private Number partitions = DEFAULT_PARTITIONS;
+ private String checkpointInterval = DEFAULT_CHECKPOINT_INTERVAL;
+ private Boolean alwaysSync = DEFAULT_ALWAYS_SYNC;
+ private SwapSchema swapProperties;
+
+ public FlowFileRepositorySchema() {
+ swapProperties = new SwapSchema();
+ }
+
+ public FlowFileRepositorySchema(Map map) {
+ partitions = getOptionalKeyAsType(map, PARTITIONS_KEY, Number.class, FLOWFILE_REPO_KEY, DEFAULT_PARTITIONS);
+ checkpointInterval = getOptionalKeyAsType(map, CHECKPOINT_INTERVAL_KEY, String.class, FLOWFILE_REPO_KEY, DEFAULT_CHECKPOINT_INTERVAL);
+ alwaysSync = getOptionalKeyAsType(map, ALWAYS_SYNC_KEY, Boolean.class, FLOWFILE_REPO_KEY, DEFAULT_ALWAYS_SYNC);
+
+ swapProperties = getMapAsType(map, SWAP_PROPS_KEY, SwapSchema.class, FLOWFILE_REPO_KEY, false);
+ addIssuesIfNotNull(swapProperties);
+ }
+
+ @Override
+ public Map<String, Object> toMap() {
+ Map<String, Object> result = mapSupplier.get();
+ result.put(PARTITIONS_KEY, partitions);
+ result.put(CHECKPOINT_INTERVAL_KEY, checkpointInterval);
+ result.put(ALWAYS_SYNC_KEY, alwaysSync);
+ putIfNotNull(result, SWAP_PROPS_KEY, swapProperties);
+ return result;
+ }
+
+ public Number getPartitions() {
+ return partitions;
+ }
+
+ public String getCheckpointInterval() {
+ return checkpointInterval;
+ }
+
+ public boolean getAlwaysSync() {
+ return alwaysSync;
+ }
+
+ public SwapSchema getSwapProperties() {
+ return swapProperties;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessorSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessorSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessorSchema.java
new file mode 100644
index 0000000..d008df5
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessorSchema.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.schema;
+
+import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.MAX_CONCURRENT_TASKS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NAME_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROCESSORS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SCHEDULING_PERIOD_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SCHEDULING_STRATEGY_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.YIELD_PERIOD_KEY;
+
+/**
+ *
+ */
+public class ProcessorSchema extends BaseSchema {
+ public static final String CLASS_KEY = "class";
+ public static final String PENALIZATION_PERIOD_KEY = "penalization period";
+ public static final String RUN_DURATION_NANOS_KEY = "run duration nanos";
+ public static final String AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY = "auto-terminated relationships list";
+ public static final String PROCESSOR_PROPS_KEY = "Properties";
+ public static final int DEFAULT_MAX_CONCURRENT_TASKS = 1;
+ public static final String DEFAULT_PENALIZATION_PERIOD = "30 sec";
+ public static final String DEFAULT_YIELD_DURATION = "1 sec";
+ public static final long DEFAULT_RUN_DURATION_NANOS = 0;
+ public static final List<String> DEFAULT_AUTO_TERMINATED_RELATIONSHIPS_LIST = Collections.emptyList();
+ public static final Map<String, Object> DEFAULT_PROPERTIES = Collections.emptyMap();
+ public static final String IT_IS_NOT_A_VALID_SCHEDULING_STRATEGY = "it is not a valid scheduling strategy";
+
+ private String name;
+ private String processorClass;
+ private String schedulingStrategy;
+ private String schedulingPeriod;
+ private Number maxConcurrentTasks = DEFAULT_MAX_CONCURRENT_TASKS;
+ private String penalizationPeriod = DEFAULT_PENALIZATION_PERIOD;
+ private String yieldPeriod = DEFAULT_YIELD_DURATION;
+ private Number runDurationNanos = DEFAULT_RUN_DURATION_NANOS;
+ private List<String> autoTerminatedRelationshipsList = DEFAULT_AUTO_TERMINATED_RELATIONSHIPS_LIST;
+ private Map<String, Object> properties = DEFAULT_PROPERTIES;
+
+ public ProcessorSchema(Map map) {
+ name = getRequiredKeyAsType(map, NAME_KEY, String.class, PROCESSORS_KEY);
+ processorClass = getRequiredKeyAsType(map, CLASS_KEY, String.class, PROCESSORS_KEY);
+ schedulingStrategy = getRequiredKeyAsType(map, SCHEDULING_STRATEGY_KEY, String.class, PROCESSORS_KEY);
+ if (schedulingStrategy != null && !isSchedulingStrategy(schedulingStrategy)) {
+ addValidationIssue(SCHEDULING_STRATEGY_KEY, PROCESSORS_KEY, IT_IS_NOT_A_VALID_SCHEDULING_STRATEGY);
+ }
+ schedulingPeriod = getRequiredKeyAsType(map, SCHEDULING_PERIOD_KEY, String.class, PROCESSORS_KEY);
+
+ maxConcurrentTasks = getOptionalKeyAsType(map, MAX_CONCURRENT_TASKS_KEY, Number.class, PROCESSORS_KEY, DEFAULT_MAX_CONCURRENT_TASKS);
+ penalizationPeriod = getOptionalKeyAsType(map, PENALIZATION_PERIOD_KEY, String.class, PROCESSORS_KEY, DEFAULT_PENALIZATION_PERIOD);
+ yieldPeriod = getOptionalKeyAsType(map, YIELD_PERIOD_KEY, String.class, PROCESSORS_KEY, DEFAULT_YIELD_DURATION);
+ runDurationNanos = getOptionalKeyAsType(map, RUN_DURATION_NANOS_KEY, Number.class, PROCESSORS_KEY, DEFAULT_RUN_DURATION_NANOS);
+ autoTerminatedRelationshipsList = getOptionalKeyAsType(map, AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY, List.class, PROCESSORS_KEY, DEFAULT_AUTO_TERMINATED_RELATIONSHIPS_LIST);
+ properties = getOptionalKeyAsType(map, PROCESSOR_PROPS_KEY, Map.class, PROCESSORS_KEY, DEFAULT_PROPERTIES);
+ }
+
+ private static boolean isSchedulingStrategy(String string) {
+ try {
+ SchedulingStrategy.valueOf(string);
+ } catch (Exception e) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public Map<String, Object> toMap() {
+ Map<String, Object> result = mapSupplier.get();
+ result.put(NAME_KEY, name);
+ result.put(CLASS_KEY, processorClass);
+ result.put(MAX_CONCURRENT_TASKS_KEY, maxConcurrentTasks);
+ result.put(SCHEDULING_STRATEGY_KEY, schedulingStrategy);
+ result.put(SCHEDULING_PERIOD_KEY, schedulingPeriod);
+ result.put(PENALIZATION_PERIOD_KEY, penalizationPeriod);
+ result.put(YIELD_PERIOD_KEY, yieldPeriod);
+ result.put(RUN_DURATION_NANOS_KEY, runDurationNanos);
+ result.put(AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY, autoTerminatedRelationshipsList);
+ result.put(PROCESSOR_PROPS_KEY, new TreeMap<>(properties));
+ return result;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getProcessorClass() {
+ return processorClass;
+ }
+
+ public Number getMaxConcurrentTasks() {
+ return maxConcurrentTasks;
+ }
+
+ public String getSchedulingStrategy() {
+ return schedulingStrategy;
+ }
+
+ public String getSchedulingPeriod() {
+ return schedulingPeriod;
+ }
+
+ public String getPenalizationPeriod() {
+ return penalizationPeriod;
+ }
+
+ public String getYieldPeriod() {
+ return yieldPeriod;
+ }
+
+ public Number getRunDurationNanos() {
+ return runDurationNanos;
+ }
+
+ public List<String> getAutoTerminatedRelationshipsList() {
+ return autoTerminatedRelationshipsList;
+ }
+
+ public Map<String, Object> getProperties() {
+ return properties;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceReportingSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceReportingSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceReportingSchema.java
new file mode 100644
index 0000000..b12adb7
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceReportingSchema.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.schema;
+
+import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+
+import java.util.Map;
+
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMMENT_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPORTING_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SCHEDULING_PERIOD_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SCHEDULING_STRATEGY_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.USE_COMPRESSION_KEY;
+import static org.apache.nifi.minifi.commons.schema.RemoteProcessingGroupSchema.TIMEOUT_KEY;
+
+/**
+ *
+ */
+public class ProvenanceReportingSchema extends BaseSchema {
+ public static final String DESTINATION_URL_KEY = "destination url";
+ public static final String PORT_NAME_KEY = "port name";
+ public static final String ORIGINATING_URL_KEY = "originating url";
+ public static final String BATCH_SIZE_KEY = "batch size";
+
+ public static final String DEFAULT_ORGINATING_URL = "http://${hostname(true)}:8080/nifi";
+ public static final String DEFAULT_TIMEOUT = "30 secs";
+ public static final int DEFAULT_BATCH_SIZE = 1000;
+ public static final boolean DEFAULT_USE_COMPRESSION = true;
+
+ private String schedulingStrategy;
+ private String schedulingPeriod;
+ private String destinationUrl;
+ private String portName;
+
+ private String comment;
+ private String originatingUrl = DEFAULT_ORGINATING_URL;
+ private Boolean useCompression = DEFAULT_USE_COMPRESSION;
+ private String timeout = DEFAULT_TIMEOUT;
+ private Number batchSize = DEFAULT_BATCH_SIZE;
+
+ public ProvenanceReportingSchema(Map map) {
+ schedulingStrategy = getRequiredKeyAsType(map, SCHEDULING_STRATEGY_KEY, String.class, PROVENANCE_REPORTING_KEY);
+ if (schedulingStrategy != null) {
+ try {
+ SchedulingStrategy.valueOf(schedulingStrategy);
+ } catch (IllegalArgumentException e) {
+ addValidationIssue(SCHEDULING_STRATEGY_KEY, PROVENANCE_REPORTING_KEY, "it is not a valid scheduling strategy");
+ }
+ }
+
+ schedulingPeriod = getRequiredKeyAsType(map, SCHEDULING_PERIOD_KEY, String.class, PROVENANCE_REPORTING_KEY);
+ destinationUrl = getRequiredKeyAsType(map, DESTINATION_URL_KEY, String.class, PROVENANCE_REPORTING_KEY);
+ portName = getRequiredKeyAsType(map, PORT_NAME_KEY, String.class, PROVENANCE_REPORTING_KEY);
+
+ comment = getOptionalKeyAsType(map, COMMENT_KEY, String.class, PROVENANCE_REPORTING_KEY, "");
+ originatingUrl = getOptionalKeyAsType(map, ORIGINATING_URL_KEY, String.class, PROVENANCE_REPORTING_KEY, DEFAULT_ORGINATING_URL);
+ useCompression = getOptionalKeyAsType(map, USE_COMPRESSION_KEY, Boolean.class, PROVENANCE_REPORTING_KEY, DEFAULT_USE_COMPRESSION);
+ timeout = getOptionalKeyAsType(map, TIMEOUT_KEY, String.class, PROVENANCE_REPORTING_KEY, DEFAULT_TIMEOUT);
+ batchSize = getOptionalKeyAsType(map, BATCH_SIZE_KEY, Number.class, PROVENANCE_REPORTING_KEY, DEFAULT_BATCH_SIZE);
+ }
+
+ @Override
+ public Map<String, Object> toMap() {
+ Map<String, Object> result = mapSupplier.get();
+ result.put(COMMENT_KEY, comment);
+ result.put(SCHEDULING_STRATEGY_KEY, schedulingStrategy);
+ result.put(SCHEDULING_PERIOD_KEY, schedulingPeriod);
+ result.put(DESTINATION_URL_KEY, destinationUrl);
+ result.put(PORT_NAME_KEY, portName);
+ result.put(ORIGINATING_URL_KEY, originatingUrl);
+ result.put(USE_COMPRESSION_KEY, useCompression);
+ result.put(TIMEOUT_KEY, timeout);
+ result.put(BATCH_SIZE_KEY, batchSize);
+ return result;
+ }
+
+ public String getComment() {
+ return comment;
+ }
+
+ public String getSchedulingStrategy() {
+ return schedulingStrategy;
+ }
+
+ public String getSchedulingPeriod() {
+ return schedulingPeriod;
+ }
+
+ public String getDestinationUrl() {
+ return destinationUrl;
+ }
+
+ public String getPortName() {
+ return portName;
+ }
+
+ public String getOriginatingUrl() {
+ return originatingUrl;
+ }
+
+ public boolean getUseCompression() {
+ return useCompression;
+ }
+
+ public String getTimeout() {
+ return timeout;
+ }
+
+ public Number getBatchSize() {
+ return batchSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceRepositorySchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceRepositorySchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceRepositorySchema.java
new file mode 100644
index 0000000..ac858ef
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProvenanceRepositorySchema.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.schema;
+
+import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
+
+import java.util.Map;
+
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROVENANCE_REPO_KEY;
+
+public class ProvenanceRepositorySchema extends BaseSchema {
+ public static final String PROVENANCE_REPO_ROLLOVER_TIME_KEY = "provenance rollover time";
+
+ public static final String DEFAULT_PROVENANCE_ROLLOVER_TIME = "1 min";
+
+ private String provenanceRepoRolloverTime = DEFAULT_PROVENANCE_ROLLOVER_TIME;
+
+ public ProvenanceRepositorySchema(){
+ }
+
+ public ProvenanceRepositorySchema(Map map) {
+ provenanceRepoRolloverTime = getOptionalKeyAsType(map, PROVENANCE_REPO_ROLLOVER_TIME_KEY, String.class,
+ PROVENANCE_REPO_KEY, DEFAULT_PROVENANCE_ROLLOVER_TIME);
+ }
+
+ @Override
+ public Map<String, Object> toMap() {
+ Map<String, Object> result = mapSupplier.get();
+ result.put(PROVENANCE_REPO_ROLLOVER_TIME_KEY, provenanceRepoRolloverTime);
+ return result;
+ }
+
+ public String getProvenanceRepoRolloverTimeKey() {
+ return provenanceRepoRolloverTime;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteInputPortSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteInputPortSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteInputPortSchema.java
new file mode 100644
index 0000000..f12f443
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteInputPortSchema.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.schema;
+
+import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
+
+import java.util.Map;
+
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMMENT_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.ID_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.INPUT_PORTS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.MAX_CONCURRENT_TASKS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NAME_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.USE_COMPRESSION_KEY;
+
+/**
+ *
+ */
+public class RemoteInputPortSchema extends BaseSchema {
+ public static final String DEFAULT_COMMENT = "";
+ public static final int DEFAULT_MAX_CONCURRENT_TASKS = 1;
+ public static final boolean DEFAULT_USE_COMPRESSION = true;
+
+ private String id;
+ private String name;
+
+ private String comment = DEFAULT_COMMENT;
+ private Number maxConcurrentTasks = DEFAULT_MAX_CONCURRENT_TASKS;
+ private Boolean useCompression = DEFAULT_USE_COMPRESSION;
+
+ public RemoteInputPortSchema(Map map) {
+ id = getRequiredKeyAsType(map, ID_KEY, String.class, INPUT_PORTS_KEY);
+ name = getRequiredKeyAsType(map, NAME_KEY, String.class, INPUT_PORTS_KEY);
+
+ comment = getOptionalKeyAsType(map, COMMENT_KEY, String.class, INPUT_PORTS_KEY, DEFAULT_COMMENT);
+ maxConcurrentTasks = getOptionalKeyAsType(map, MAX_CONCURRENT_TASKS_KEY, Number.class, INPUT_PORTS_KEY, DEFAULT_MAX_CONCURRENT_TASKS);
+ useCompression = getOptionalKeyAsType(map, USE_COMPRESSION_KEY, Boolean.class, INPUT_PORTS_KEY, DEFAULT_USE_COMPRESSION);
+ }
+
+ @Override
+ public Map<String, Object> toMap() {
+ Map<String, Object> result = mapSupplier.get();
+ result.put(ID_KEY, id);
+ result.put(NAME_KEY, name);
+ result.put(COMMENT_KEY, comment);
+ result.put(MAX_CONCURRENT_TASKS_KEY, maxConcurrentTasks);
+ result.put(USE_COMPRESSION_KEY, useCompression);
+ return result;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getComment() {
+ return comment;
+ }
+
+ public Number getMax_concurrent_tasks() {
+ return maxConcurrentTasks;
+ }
+
+ public boolean getUseCompression() {
+ return useCompression;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessingGroupSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessingGroupSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessingGroupSchema.java
new file mode 100644
index 0000000..3fb351e
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessingGroupSchema.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.schema;
+
+import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMMENT_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.INPUT_PORTS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NAME_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.REMOTE_PROCESSING_GROUPS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.YIELD_PERIOD_KEY;
+
+/**
+ *
+ */
+public class RemoteProcessingGroupSchema extends BaseSchema {
+ public static final String URL_KEY = "url";
+ public static final String TIMEOUT_KEY = "timeout";
+
+ public static final String DEFAULT_COMMENT = "";
+ public static final String DEFAULT_TIMEOUT = "30 secs";
+ public static final String DEFAULT_YIELD_PERIOD = "10 sec";
+
+ private String name;
+ private String url;
+ private List<RemoteInputPortSchema> inputPorts;
+
+ private String comment = DEFAULT_COMMENT;
+ private String timeout = DEFAULT_TIMEOUT;
+ private String yieldPeriod = DEFAULT_YIELD_PERIOD;
+
+ public RemoteProcessingGroupSchema(Map map) {
+ name = getRequiredKeyAsType(map, NAME_KEY, String.class, REMOTE_PROCESSING_GROUPS_KEY);
+ url = getRequiredKeyAsType(map, URL_KEY, String.class, REMOTE_PROCESSING_GROUPS_KEY);
+ inputPorts = getRequiredKeyAsType(map, INPUT_PORTS_KEY, List.class, REMOTE_PROCESSING_GROUPS_KEY);
+ if (inputPorts != null) {
+ transformListToType(inputPorts, "input port", RemoteInputPortSchema.class, INPUT_PORTS_KEY);
+
+ for (RemoteInputPortSchema remoteInputPortSchema: inputPorts) {
+ addIssuesIfNotNull(remoteInputPortSchema);
+ }
+ }
+
+ comment = getOptionalKeyAsType(map, COMMENT_KEY, String.class, REMOTE_PROCESSING_GROUPS_KEY, DEFAULT_COMMENT);
+ timeout = getOptionalKeyAsType(map, TIMEOUT_KEY, String.class, REMOTE_PROCESSING_GROUPS_KEY, DEFAULT_TIMEOUT);
+ yieldPeriod = getOptionalKeyAsType(map, YIELD_PERIOD_KEY, String.class, REMOTE_PROCESSING_GROUPS_KEY, DEFAULT_YIELD_PERIOD);
+ }
+
+ @Override
+ public Map<String, Object> toMap() {
+ Map<String, Object> result = mapSupplier.get();
+ result.put(NAME_KEY, name);
+ result.put(URL_KEY, url);
+ result.put(COMMENT_KEY, comment);
+ result.put(TIMEOUT_KEY, timeout);
+ result.put(YIELD_PERIOD_KEY, yieldPeriod);
+ putListIfNotNull(result, INPUT_PORTS_KEY, inputPorts);
+ return result;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getComment() {
+ return comment;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public String getTimeout() {
+ return timeout;
+ }
+
+ public String getYieldPeriod() {
+ return yieldPeriod;
+ }
+
+ public List<RemoteInputPortSchema> getInputPorts() {
+ return inputPorts;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SecurityPropertiesSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SecurityPropertiesSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SecurityPropertiesSchema.java
new file mode 100644
index 0000000..3f0c6c1
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SecurityPropertiesSchema.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.schema;
+
+import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
+
+import java.util.Map;
+
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SECURITY_PROPS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SENSITIVE_PROPS_KEY;
+
+/**
+ *
+ */
+public class SecurityPropertiesSchema extends BaseSchema {
+
+ public static final String KEYSTORE_KEY = "keystore";
+ public static final String KEYSTORE_TYPE_KEY = "keystore type";
+ public static final String KEYSTORE_PASSWORD_KEY = "keystore password";
+ public static final String KEY_PASSWORD_KEY = "key password";
+ public static final String TRUSTSTORE_KEY = "truststore";
+ public static final String TRUSTSTORE_TYPE_KEY = "truststore type";
+ public static final String TRUSTSTORE_PASSWORD_KEY = "truststore password";
+ public static final String SSL_PROTOCOL_KEY = "ssl protocol";
+
+ private String keystore = "";
+ private String keystoreType = "";
+ private String keystorePassword = "";
+ private String keyPassword = "";
+ private String truststore = "";
+ private String truststoreType = "";
+ private String truststorePassword = "";
+ private String sslProtocol = "";
+ private SensitivePropsSchema sensitiveProps;
+
+ public SecurityPropertiesSchema() {
+ sensitiveProps = new SensitivePropsSchema();
+ }
+
+ public SecurityPropertiesSchema(Map map) {
+ keystore = getOptionalKeyAsType(map, KEYSTORE_KEY, String.class, SECURITY_PROPS_KEY, "");
+
+ keystoreType = getOptionalKeyAsType(map, KEYSTORE_TYPE_KEY, String.class, SECURITY_PROPS_KEY, "");
+ if (!isNullOrEmpty(keystoreType)) {
+ if (validateStoreType(keystoreType)) {
+ addValidationIssue(KEYSTORE_TYPE_KEY, SECURITY_PROPS_KEY, "it is not a supported type (must be either PKCS12 or JKS format)");
+ }
+ }
+
+ keystorePassword = getOptionalKeyAsType(map, KEYSTORE_PASSWORD_KEY, String.class, SECURITY_PROPS_KEY, "");
+
+ keyPassword = getOptionalKeyAsType(map, KEY_PASSWORD_KEY, String.class, SECURITY_PROPS_KEY, "");
+
+ truststore = getOptionalKeyAsType(map, TRUSTSTORE_KEY, String.class, SECURITY_PROPS_KEY, "");
+
+ truststoreType = getOptionalKeyAsType(map, TRUSTSTORE_TYPE_KEY, String.class, SECURITY_PROPS_KEY, "");
+ if (!isNullOrEmpty(truststoreType)) {
+ if (validateStoreType(truststoreType)) {
+ addValidationIssue(TRUSTSTORE_TYPE_KEY, SECURITY_PROPS_KEY, "it is not a supported type (must be either PKCS12 or JKS format)");
+ }
+ }
+
+ truststorePassword = getOptionalKeyAsType(map, TRUSTSTORE_PASSWORD_KEY, String.class, SECURITY_PROPS_KEY, "");
+
+ sslProtocol = getOptionalKeyAsType(map, SSL_PROTOCOL_KEY, String.class, SECURITY_PROPS_KEY, "");
+ if (!isNullOrEmpty(sslProtocol)) {
+ switch (sslProtocol) {
+ case "SSL":
+ break;
+ case "SSLv2Hello":
+ break;
+ case "SSLv3":
+ break;
+ case "TLS":
+ break;
+ case "TLSv1":
+ break;
+ case "TLSv1.1":
+ break;
+ case "TLSv1.2":
+ break;
+ default:
+ addValidationIssue(SSL_PROTOCOL_KEY, SECURITY_PROPS_KEY, "it is not an allowable value of SSL protocol");
+ break;
+ }
+ if (isNullOrEmpty(keystore)) {
+ validationIssues.add("When the '" + SSL_PROTOCOL_KEY + "' key of '" + SECURITY_PROPS_KEY + "' is set, the '" + KEYSTORE_KEY + "' must also be set");
+ } else if (isNullOrEmpty(keystoreType) || isNullOrEmpty(keystorePassword) || isNullOrEmpty(keyPassword)) {
+ validationIssues.add("When the '" + KEYSTORE_KEY + "' key of '" + SECURITY_PROPS_KEY + "' is set, the '" + KEYSTORE_TYPE_KEY + "', '" + KEYSTORE_PASSWORD_KEY +
+ "' and '" + KEY_PASSWORD_KEY + "' all must also be set");
+ }
+
+ if (!isNullOrEmpty(truststore) && (isNullOrEmpty(truststoreType) || isNullOrEmpty(truststorePassword))) {
+ validationIssues.add("When the '" + TRUSTSTORE_KEY + "' key of '" + SECURITY_PROPS_KEY + "' is set, the '" + TRUSTSTORE_TYPE_KEY + "' and '" +
+ TRUSTSTORE_PASSWORD_KEY + "' must also be set");
+ }
+ }
+
+ sensitiveProps = getMapAsType(map, SENSITIVE_PROPS_KEY, SensitivePropsSchema.class, SECURITY_PROPS_KEY, false);
+
+ addIssuesIfNotNull(sensitiveProps);
+ }
+
+ @Override
+ public Map<String, Object> toMap() {
+ Map<String, Object> result = mapSupplier.get();
+ result.put(KEYSTORE_KEY, keystore);
+ result.put(KEYSTORE_TYPE_KEY, keystoreType);
+ result.put(KEYSTORE_PASSWORD_KEY, keystorePassword);
+ result.put(KEY_PASSWORD_KEY, keyPassword);
+ result.put(TRUSTSTORE_KEY, truststore);
+ result.put(TRUSTSTORE_TYPE_KEY, truststoreType);
+ result.put(TRUSTSTORE_PASSWORD_KEY, truststorePassword);
+ result.put(SSL_PROTOCOL_KEY, sslProtocol);
+ putIfNotNull(result, SENSITIVE_PROPS_KEY, sensitiveProps);
+ return result;
+ }
+
+ private boolean validateStoreType(String store) {
+ return !store.isEmpty() && !(store.equalsIgnoreCase("JKS") || store.equalsIgnoreCase("PKCS12"));
+ }
+
+ public boolean useSSL() {
+ return !isNullOrEmpty(sslProtocol);
+ }
+
+ public String getKeystore() {
+ return keystore;
+ }
+
+ public String getKeystoreType() {
+ return keystoreType;
+ }
+
+ public String getKeystorePassword() {
+ return keystorePassword;
+ }
+
+ public String getKeyPassword() {
+ return keyPassword;
+ }
+
+ public String getTruststore() {
+ return truststore;
+ }
+
+ public String getTruststoreType() {
+ return truststoreType;
+ }
+
+ public String getTruststorePassword() {
+ return truststorePassword;
+ }
+
+ public String getSslProtocol() {
+ return sslProtocol;
+ }
+
+ public SensitivePropsSchema getSensitiveProps() {
+ return sensitiveProps;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SensitivePropsSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SensitivePropsSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SensitivePropsSchema.java
new file mode 100644
index 0000000..93260ea
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SensitivePropsSchema.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.schema;
+
+import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
+
+import java.util.Map;
+
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SENSITIVE_PROPS_KEY;
+
+/**
+ *
+ */
+public class SensitivePropsSchema extends BaseSchema {
+ public static final String SENSITIVE_PROPS_KEY_KEY = "key";
+ public static final String SENSITIVE_PROPS_ALGORITHM_KEY = "algorithm";
+ public static final String SENSITIVE_PROPS_PROVIDER_KEY = "provider";
+
+ public static final String DEFAULT_ALGORITHM = "PBEWITHMD5AND256BITAES-CBC-OPENSSL";
+ public static final String DEFAULT_PROVIDER = "BC";
+
+ private String key;
+ private String algorithm = DEFAULT_ALGORITHM;
+ private String provider = DEFAULT_PROVIDER;
+
+ public SensitivePropsSchema() {
+ }
+
+ public SensitivePropsSchema(Map map) {
+ key = getOptionalKeyAsType(map, SENSITIVE_PROPS_KEY_KEY, String.class, SENSITIVE_PROPS_KEY, "");
+ algorithm = getOptionalKeyAsType(map, SENSITIVE_PROPS_ALGORITHM_KEY, String.class, SENSITIVE_PROPS_KEY, DEFAULT_ALGORITHM);
+ provider = getOptionalKeyAsType(map, SENSITIVE_PROPS_PROVIDER_KEY, String.class, SENSITIVE_PROPS_KEY, DEFAULT_PROVIDER);
+ }
+
+ @Override
+ public Map<String, Object> toMap() {
+ Map<String, Object> result = mapSupplier.get();
+ result.put(SENSITIVE_PROPS_KEY_KEY, key);
+ result.put(SENSITIVE_PROPS_ALGORITHM_KEY, algorithm);
+ result.put(SENSITIVE_PROPS_PROVIDER_KEY, provider);
+ return result;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public String getAlgorithm() {
+ return algorithm;
+ }
+
+ public String getProvider() {
+ return provider;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SwapSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SwapSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SwapSchema.java
new file mode 100644
index 0000000..d38ce7a
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/SwapSchema.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.schema;
+
+import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
+
+import java.util.Map;
+
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SWAP_PROPS_KEY;
+
+/**
+ *
+ */
+public class SwapSchema extends BaseSchema {
+ public static final String THRESHOLD_KEY = "threshold";
+ public static final String IN_PERIOD_KEY = "in period";
+ public static final String IN_THREADS_KEY = "in threads";
+ public static final String OUT_PERIOD_KEY = "out period";
+ public static final String OUT_THREADS_KEY = "out threads";
+
+ public static final int DEFAULT_THRESHOLD = 20000;
+ public static final String DEFAULT_IN_PERIOD = "5 sec";
+ public static final int DEFAULT_IN_THREADS = 1;
+ public static final String DEFAULT_OUT_PERIOD = "5 sec";
+ public static final int DEFAULT_OUT_THREADS = 4;
+
+ private Number threshold = DEFAULT_THRESHOLD;
+ private String inPeriod = DEFAULT_IN_PERIOD;
+ private Number inThreads = DEFAULT_IN_THREADS;
+ private String outPeriod = DEFAULT_OUT_PERIOD;
+ private Number outThreads = DEFAULT_OUT_THREADS;
+
+ public SwapSchema() {
+ }
+
+ public SwapSchema(Map map) {
+ threshold = getOptionalKeyAsType(map, THRESHOLD_KEY, Number.class, SWAP_PROPS_KEY, DEFAULT_THRESHOLD);
+ inPeriod = getOptionalKeyAsType(map, IN_PERIOD_KEY, String.class, SWAP_PROPS_KEY, DEFAULT_IN_PERIOD);
+ inThreads = getOptionalKeyAsType(map, IN_THREADS_KEY, Number.class, SWAP_PROPS_KEY, DEFAULT_IN_THREADS);
+ outPeriod = getOptionalKeyAsType(map, OUT_PERIOD_KEY, String.class, SWAP_PROPS_KEY, DEFAULT_OUT_PERIOD);
+ outThreads = getOptionalKeyAsType(map, OUT_THREADS_KEY, Number.class, SWAP_PROPS_KEY, DEFAULT_OUT_THREADS);
+ }
+
+ @Override
+ public Map<String, Object> toMap() {
+ Map<String, Object> result = mapSupplier.get();
+ result.put(THRESHOLD_KEY, threshold);
+ result.put(IN_PERIOD_KEY, inPeriod);
+ result.put(IN_THREADS_KEY, inThreads);
+ result.put(OUT_PERIOD_KEY, outPeriod);
+ result.put(OUT_THREADS_KEY, outThreads);
+ return result;
+ }
+
+ public Number getThreshold() {
+ return threshold;
+ }
+
+ public String getInPeriod() {
+ return inPeriod;
+ }
+
+ public Number getInThreads() {
+ return inThreads;
+ }
+
+ public String getOutPeriod() {
+ return outPeriod;
+ }
+
+ public Number getOutThreads() {
+ return outThreads;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/2d1e43e7/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchema.java
new file mode 100644
index 0000000..8c85acb
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchema.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.schema.common;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public abstract class BaseSchema {
+ public static final String IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED = "it was not found and it is required";
+ protected final Supplier<Map<String, Object>> mapSupplier;
+
+ public BaseSchema() {
+ this(LinkedHashMap::new);
+ }
+
+ public BaseSchema(Supplier<Map<String, Object>> mapSupplier) {
+ this.mapSupplier = mapSupplier;
+ }
+
+ /******* Validation Issue helper methods *******/
+ public List<String> validationIssues = new LinkedList<>();
+
+ public boolean isValid() {
+ return validationIssues.isEmpty();
+ }
+
+ public List<String> getValidationIssues() {
+ return validationIssues;
+ }
+
+ public String getValidationIssuesAsString() {
+ StringBuilder stringBuilder = new StringBuilder();
+ boolean first = true;
+ for (String validationIssue : validationIssues) {
+ if (!first) {
+ stringBuilder.append(", ");
+ }
+ stringBuilder.append("[");
+ stringBuilder.append(validationIssue);
+ stringBuilder.append("]");
+ first = false;
+ }
+ return stringBuilder.toString();
+ }
+
+ public <T> T getAndValidateNotNull(Supplier<T> supplier, String keyName, String wrapperName) {
+ return getAndValidate(supplier, t -> t != null, keyName, wrapperName, IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED);
+ }
+
+ public <T> T getAndValidate(Supplier<T> supplier, Predicate<T> predicate, String keyName, String wrapperName, String reason) {
+ T result = supplier.get();
+ if (!predicate.test(result)) {
+ addValidationIssue(keyName, wrapperName, reason);
+ }
+ return result;
+ }
+
+ public void addValidationIssue(String keyName, String wrapperName, String reason) {
+ validationIssues.add("'" + keyName + "' in section '" + wrapperName + "' because " + reason);
+ }
+
+ public void addIssuesIfNotNull(BaseSchema baseSchema) {
+ if (baseSchema != null) {
+ validationIssues.addAll(baseSchema.getValidationIssues());
+ }
+ }
+
+ /******* Value Access/Interpretation helper methods *******/
+ public <T> T getOptionalKeyAsType(Map valueMap, String key, Class<T> targetClass, String wrapperName, T defaultValue) {
+ return getKeyAsType(valueMap, key, targetClass, wrapperName, false, defaultValue);
+ }
+
+ public <T> T getRequiredKeyAsType(Map valueMap, String key, Class<T> targetClass, String wrapperName) {
+ return getKeyAsType(valueMap, key, targetClass, wrapperName, true, null);
+ }
+
+ <T> T getKeyAsType(Map valueMap, String key, Class<T> targetClass, String wrapperName, boolean required, T defaultValue) {
+ Object value = valueMap.get(key);
+ if (value == null) {
+ if (defaultValue != null) {
+ return defaultValue;
+ } else if(required) {
+ addValidationIssue(key, wrapperName, IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED);
+ }
+ } else {
+ if (targetClass.isInstance(value)) {
+ return (T) value;
+ } else {
+ addValidationIssue(key, wrapperName, "it is found but could not be parsed as a " + targetClass.getSimpleName());
+ }
+ }
+ return null;
+ }
+
+
+ public <T> T getMapAsType(Map valueMap, String key, Class<T> targetClass, String wrapperName, boolean required) {
+ Object obj = valueMap.get(key);
+ return interpretValueAsType(obj, key, targetClass, wrapperName, required, true);
+ }
+
+ public <T> T getMapAsType(Map valueMap, String key, Class targetClass, String wrapperName, boolean required, boolean instantiateIfNull) {
+ Object obj = valueMap.get(key);
+ return interpretValueAsType(obj, key, targetClass, wrapperName, required, instantiateIfNull);
+ }
+
+ public <T> void transformListToType(List<T> list, String simpleListType, Class<T> targetClass, String wrapperName){
+ for (int i = 0; i < list.size(); i++) {
+ T obj = interpretValueAsType(list.get(i), simpleListType + " number " + i, targetClass, wrapperName, false, false);
+ if (obj != null) {
+ list.set(i, obj);
+ }
+ }
+ }
+
+ private <T> T interpretValueAsType(Object obj, String key, Class targetClass, String wrapperName, boolean required, boolean instantiateIfNull) {
+ if (obj == null) {
+ if (required){
+ addValidationIssue(key, wrapperName, "it is a required property but was not found");
+ } else {
+ if(instantiateIfNull) {
+ try {
+ return (T) targetClass.newInstance();
+ } catch (InstantiationException | IllegalAccessException e) {
+ addValidationIssue(key, wrapperName, "no value was given, and it is supposed to be created with default values as a default, and when attempting to create it the following " +
+ "exception was thrown:" + e.getMessage());
+ }
+ }
+ }
+ } else if (obj instanceof Map) {
+ Constructor<?> constructor;
+ try {
+ constructor = targetClass.getConstructor(Map.class);
+ return (T) constructor.newInstance((Map) obj);
+ } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
+ addValidationIssue(key, wrapperName, "it is found as a map and when attempting to interpret it the following exception was thrown:" + e.getMessage());
+ }
+ } else {
+ try {
+ return (T) obj;
+ } catch (ClassCastException e) {
+ addValidationIssue(key, wrapperName, "it is found but could not be parsed as a map");
+ }
+ }
+ return null;
+ }
+
+ public abstract Map<String, Object> toMap();
+
+ public static void putIfNotNull(Map valueMap, String key, BaseSchema schema) {
+ if (schema != null) {
+ valueMap.put(key, schema.toMap());
+ }
+ }
+
+ public static void putListIfNotNull(Map valueMap, String key, List<? extends BaseSchema> list) {
+ if (list != null) {
+ valueMap.put(key, list.stream().map(BaseSchema::toMap).collect(Collectors.toList()));
+ }
+ }
+
+ public static <T> List<T> nullToEmpty(List<T> list) {
+ return list == null ? Collections.emptyList() : list;
+ }
+
+ public static <T> Set<T> nullToEmpty(Set<T> set) {
+ return set == null ? Collections.emptySet() : set;
+ }
+
+ public static <K, V> Map<K, V> nullToEmpty(Map<K, V> map) {
+ return map == null ? Collections.emptyMap() : map;
+ }
+
+ public static boolean isNullOrEmpty(final String string) {
+ return string == null || string.isEmpty();
+ }
+}