You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/11/08 20:07:41 UTC
[2/3] nifi-minifi git commit: MINIFI-107 - Process group support
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessorSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessorSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessorSchema.java
index 048027c..6f2ff8e 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessorSchema.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/ProcessorSchema.java
@@ -26,7 +26,6 @@ import java.util.Map;
import java.util.TreeMap;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.MAX_CONCURRENT_TASKS_KEY;
-import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.PROCESSORS_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SCHEDULING_PERIOD_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SCHEDULING_STRATEGY_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.YIELD_PERIOD_KEY;
@@ -59,22 +58,23 @@ public class ProcessorSchema extends BaseSchemaWithIdAndName {
private String annotationData = "";
public ProcessorSchema(Map map) {
- super(map, PROCESSORS_KEY);
- processorClass = getRequiredKeyAsType(map, CLASS_KEY, String.class, PROCESSORS_KEY);
- schedulingStrategy = getRequiredKeyAsType(map, SCHEDULING_STRATEGY_KEY, String.class, PROCESSORS_KEY);
+ super(map, "Processor(id: {id}, name: {name})");
+ String wrapperName = getWrapperName();
+ processorClass = getRequiredKeyAsType(map, CLASS_KEY, String.class, wrapperName);
+ schedulingStrategy = getRequiredKeyAsType(map, SCHEDULING_STRATEGY_KEY, String.class, wrapperName);
if (schedulingStrategy != null && !isSchedulingStrategy(schedulingStrategy)) {
- addValidationIssue(SCHEDULING_STRATEGY_KEY, PROCESSORS_KEY, IT_IS_NOT_A_VALID_SCHEDULING_STRATEGY);
+ addValidationIssue(SCHEDULING_STRATEGY_KEY, wrapperName, IT_IS_NOT_A_VALID_SCHEDULING_STRATEGY);
}
- schedulingPeriod = getRequiredKeyAsType(map, SCHEDULING_PERIOD_KEY, String.class, PROCESSORS_KEY);
+ schedulingPeriod = getRequiredKeyAsType(map, SCHEDULING_PERIOD_KEY, String.class, wrapperName);
- maxConcurrentTasks = getOptionalKeyAsType(map, MAX_CONCURRENT_TASKS_KEY, Number.class, PROCESSORS_KEY, DEFAULT_MAX_CONCURRENT_TASKS);
- penalizationPeriod = getOptionalKeyAsType(map, PENALIZATION_PERIOD_KEY, String.class, PROCESSORS_KEY, DEFAULT_PENALIZATION_PERIOD);
- yieldPeriod = getOptionalKeyAsType(map, YIELD_PERIOD_KEY, String.class, PROCESSORS_KEY, DEFAULT_YIELD_DURATION);
- runDurationNanos = getOptionalKeyAsType(map, RUN_DURATION_NANOS_KEY, Number.class, PROCESSORS_KEY, DEFAULT_RUN_DURATION_NANOS);
- autoTerminatedRelationshipsList = getOptionalKeyAsType(map, AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY, List.class, PROCESSORS_KEY, DEFAULT_AUTO_TERMINATED_RELATIONSHIPS_LIST);
- properties = getOptionalKeyAsType(map, PROCESSOR_PROPS_KEY, Map.class, PROCESSORS_KEY, DEFAULT_PROPERTIES);
+ maxConcurrentTasks = getOptionalKeyAsType(map, MAX_CONCURRENT_TASKS_KEY, Number.class, wrapperName, DEFAULT_MAX_CONCURRENT_TASKS);
+ penalizationPeriod = getOptionalKeyAsType(map, PENALIZATION_PERIOD_KEY, String.class, wrapperName, DEFAULT_PENALIZATION_PERIOD);
+ yieldPeriod = getOptionalKeyAsType(map, YIELD_PERIOD_KEY, String.class, wrapperName, DEFAULT_YIELD_DURATION);
+ runDurationNanos = getOptionalKeyAsType(map, RUN_DURATION_NANOS_KEY, Number.class, wrapperName, DEFAULT_RUN_DURATION_NANOS);
+ autoTerminatedRelationshipsList = getOptionalKeyAsType(map, AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY, List.class, wrapperName, DEFAULT_AUTO_TERMINATED_RELATIONSHIPS_LIST);
+ properties = getOptionalKeyAsType(map, PROCESSOR_PROPS_KEY, Map.class, wrapperName, DEFAULT_PROPERTIES);
- annotationData = getOptionalKeyAsType(map, ANNOTATION_DATA_KEY, String.class, PROCESSORS_KEY, "");
+ annotationData = getOptionalKeyAsType(map, ANNOTATION_DATA_KEY, String.class, wrapperName, "");
}
public static boolean isSchedulingStrategy(String string) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteInputPortSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteInputPortSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteInputPortSchema.java
index 6ff8648..736c63f 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteInputPortSchema.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteInputPortSchema.java
@@ -22,13 +22,9 @@ import org.apache.nifi.minifi.commons.schema.common.BaseSchemaWithIdAndName;
import java.util.Map;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.COMMENT_KEY;
-import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.INPUT_PORTS_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.MAX_CONCURRENT_TASKS_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.USE_COMPRESSION_KEY;
-/**
- *
- */
public class RemoteInputPortSchema extends BaseSchemaWithIdAndName {
public static final String DEFAULT_COMMENT = "";
public static final int DEFAULT_MAX_CONCURRENT_TASKS = 1;
@@ -39,11 +35,12 @@ public class RemoteInputPortSchema extends BaseSchemaWithIdAndName {
private Boolean useCompression = DEFAULT_USE_COMPRESSION;
public RemoteInputPortSchema(Map map) {
- super(map, INPUT_PORTS_KEY);
+ super(map, "RemoteInputPort(id: {id}, name: {name})");
+ String wrapperName = getWrapperName();
- comment = getOptionalKeyAsType(map, COMMENT_KEY, String.class, INPUT_PORTS_KEY, DEFAULT_COMMENT);
- maxConcurrentTasks = getOptionalKeyAsType(map, MAX_CONCURRENT_TASKS_KEY, Number.class, INPUT_PORTS_KEY, DEFAULT_MAX_CONCURRENT_TASKS);
- useCompression = getOptionalKeyAsType(map, USE_COMPRESSION_KEY, Boolean.class, INPUT_PORTS_KEY, DEFAULT_USE_COMPRESSION);
+ comment = getOptionalKeyAsType(map, COMMENT_KEY, String.class, wrapperName, DEFAULT_COMMENT);
+ maxConcurrentTasks = getOptionalKeyAsType(map, MAX_CONCURRENT_TASKS_KEY, Number.class, wrapperName, DEFAULT_MAX_CONCURRENT_TASKS);
+ useCompression = getOptionalKeyAsType(map, USE_COMPRESSION_KEY, Boolean.class, wrapperName, DEFAULT_USE_COMPRESSION);
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessingGroupSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessingGroupSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessingGroupSchema.java
index c1d318e..6d2bb20 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessingGroupSchema.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/RemoteProcessingGroupSchema.java
@@ -18,6 +18,7 @@
package org.apache.nifi.minifi.commons.schema;
import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
+import org.apache.nifi.minifi.commons.schema.common.StringUtil;
import org.apache.nifi.minifi.commons.schema.common.WritableSchema;
import java.util.List;
@@ -29,9 +30,6 @@ import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NA
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.REMOTE_PROCESSING_GROUPS_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.YIELD_PERIOD_KEY;
-/**
- *
- */
public class RemoteProcessingGroupSchema extends BaseSchema implements WritableSchema {
public static final String URL_KEY = "url";
public static final String TIMEOUT_KEY = "timeout";
@@ -50,17 +48,18 @@ public class RemoteProcessingGroupSchema extends BaseSchema implements WritableS
public RemoteProcessingGroupSchema(Map map) {
name = getRequiredKeyAsType(map, NAME_KEY, String.class, REMOTE_PROCESSING_GROUPS_KEY);
- url = getRequiredKeyAsType(map, URL_KEY, String.class, REMOTE_PROCESSING_GROUPS_KEY);
- inputPorts = convertListToType(getRequiredKeyAsType(map, INPUT_PORTS_KEY, List.class, REMOTE_PROCESSING_GROUPS_KEY), "input port", RemoteInputPortSchema.class, INPUT_PORTS_KEY);
+ String wrapperName = new StringBuilder("RemoteProcessingGroup(name: ").append(StringUtil.isNullOrEmpty(name) ? "unknown" : name).append(")").toString();
+ url = getRequiredKeyAsType(map, URL_KEY, String.class, wrapperName);
+ inputPorts = convertListToType(getRequiredKeyAsType(map, INPUT_PORTS_KEY, List.class, wrapperName), "input port", RemoteInputPortSchema.class, INPUT_PORTS_KEY);
if (inputPorts != null) {
for (RemoteInputPortSchema remoteInputPortSchema: inputPorts) {
addIssuesIfNotNull(remoteInputPortSchema);
}
}
- comment = getOptionalKeyAsType(map, COMMENT_KEY, String.class, REMOTE_PROCESSING_GROUPS_KEY, DEFAULT_COMMENT);
- timeout = getOptionalKeyAsType(map, TIMEOUT_KEY, String.class, REMOTE_PROCESSING_GROUPS_KEY, DEFAULT_TIMEOUT);
- yieldPeriod = getOptionalKeyAsType(map, YIELD_PERIOD_KEY, String.class, REMOTE_PROCESSING_GROUPS_KEY, DEFAULT_YIELD_PERIOD);
+ comment = getOptionalKeyAsType(map, COMMENT_KEY, String.class, wrapperName, DEFAULT_COMMENT);
+ timeout = getOptionalKeyAsType(map, TIMEOUT_KEY, String.class, wrapperName, DEFAULT_TIMEOUT);
+ yieldPeriod = getOptionalKeyAsType(map, YIELD_PERIOD_KEY, String.class, wrapperName, DEFAULT_YIELD_PERIOD);
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchema.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchema.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchema.java
index 7ba322a..7cd82f7 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchema.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchema.java
@@ -17,11 +17,12 @@
package org.apache.nifi.minifi.commons.schema.common;
+import org.apache.nifi.minifi.commons.schema.exception.SchemaInstantiatonException;
+
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -29,6 +30,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -138,6 +140,40 @@ public abstract class BaseSchema implements Schema {
return result;
}
+ public <T> List<T> getOptionalKeyAsList(Map valueMap, String key, Function<Map, T> conversionFunction, String wrapperName) {
+ return convertListToType(Map.class, (List<Map>) valueMap.get(key), key, conversionFunction, wrapperName, null);
+ }
+
+ public <InputT, OutputT> List<OutputT> convertListToType(Class<InputT> inputType, List<InputT> list, String simpleListType, Function<InputT, OutputT> conversionFunction,
+ String wrapperName, Supplier<OutputT> instantiator) {
+ if (list == null) {
+ return new ArrayList<>();
+ }
+ List<OutputT> result = new ArrayList<>(list.size());
+ for (int i = 0; i < list.size(); i++) {
+ try {
+ OutputT val = interpretValueAsType(inputType, list.get(i), conversionFunction, instantiator);
+ if (val != null) {
+ result.add(val);
+ }
+ } catch (SchemaInstantiatonException e) {
+ addValidationIssue(simpleListType + " number " + i, wrapperName, e.getMessage());
+ }
+ }
+ return result;
+ }
+
+ private <InputT, OutputT> OutputT interpretValueAsType(Class<InputT> inputType, InputT input, Function<InputT, OutputT> conversionFunction, Supplier<OutputT> instantiator)
+ throws SchemaInstantiatonException {
+ if (input == null && instantiator != null) {
+ return instantiator.get();
+ }
+ if (!inputType.isInstance(input)) {
+ throw new SchemaInstantiatonException("was expecting object of type " + inputType + " but was " + input.getClass());
+ }
+ return conversionFunction.apply(input);
+ }
+
private <T> T interpretValueAsType(Object obj, String key, Class targetClass, String wrapperName, boolean required, boolean instantiateIfNull) {
if (obj == null) {
if (required){
@@ -182,20 +218,6 @@ public abstract class BaseSchema implements Schema {
}
}
- public static <T> List<T> nullToEmpty(List<T> list) {
- return list == null ? Collections.emptyList() : list;
- }
-
- public static <T> Set<T> nullToEmpty(Set<T> set) {
- return set == null ? Collections.emptySet() : set;
- }
-
- public static <K, V> Map<K, V> nullToEmpty(Map<K, V> map) {
- return map == null ? Collections.emptyMap() : map;
- }
-
-
-
public static void checkForDuplicates(Consumer<String> duplicateMessageConsumer, String errorMessagePrefix, List<String> strings) {
if (strings != null) {
Set<String> seen = new HashSet<>();
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java
index 9ab6718..a1f7bb5 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/BaseSchemaWithIdAndName.java
@@ -28,26 +28,22 @@ import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NA
public abstract class BaseSchemaWithIdAndName extends BaseSchema implements WritableSchema {
public static final Pattern VALID_ID_PATTERN = Pattern.compile("[A-Za-z0-9_-]+");
- public static final String ID_DOES_NOT_MATCH_VALID_ID_PATTERN = "Id does not match valid pattern (" + VALID_ID_PATTERN + "): ";
+ public static final String ID_DOES_NOT_MATCH_VALID_ID_PATTERN = "Id does not match valid pattern (" + VALID_ID_PATTERN + ")";
private final String wrapperName;
private String id;
private String name;
public BaseSchemaWithIdAndName(Map map, String wrapperName) {
- id = getId(map, wrapperName);
- name = getName(map, wrapperName);
this.wrapperName = wrapperName;
+ id = getId(map, getWrapperName());
+ name = getOptionalKeyAsType(map, NAME_KEY, String.class, getWrapperName(), "");
}
protected String getId(Map map, String wrapperName) {
return getOptionalKeyAsType(map, ID_KEY, String.class, wrapperName, "");
}
- protected String getName(Map map, String wrapperName) {
- return getOptionalKeyAsType(map, NAME_KEY, String.class, wrapperName, "");
- }
-
public String getId() {
return id;
}
@@ -60,10 +56,14 @@ public abstract class BaseSchemaWithIdAndName extends BaseSchema implements Writ
return name;
}
- protected void setName(String name) {
+ public void setName(String name) {
this.name = name;
}
+ public String getWrapperName() {
+ return wrapperName.replace("{id}", StringUtil.isNullOrEmpty(id) ? "unkown" : id).replace("{name}", StringUtil.isNullOrEmpty(name) ? "unkown" : name);
+ }
+
@Override
public Map<String, Object> toMap() {
Map<String, Object> map = mapSupplier.get();
@@ -76,9 +76,9 @@ public abstract class BaseSchemaWithIdAndName extends BaseSchema implements Writ
public List<String> getValidationIssues() {
List<String> validationIssues = super.getValidationIssues();
if (StringUtil.isNullOrEmpty(id)) {
- validationIssues.add(getIssueText(CommonPropertyKeys.ID_KEY, wrapperName, IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED));
+ validationIssues.add(getIssueText(CommonPropertyKeys.ID_KEY, getWrapperName(), IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED));
} else if (!VALID_ID_PATTERN.matcher(id).matches()) {
- validationIssues.add(ID_DOES_NOT_MATCH_VALID_ID_PATTERN + id);
+ validationIssues.add(getIssueText(CommonPropertyKeys.ID_KEY, getWrapperName(), ID_DOES_NOT_MATCH_VALID_ID_PATTERN));
}
return validationIssues;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CollectionUtil.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CollectionUtil.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CollectionUtil.java
new file mode 100644
index 0000000..5fb9549
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CollectionUtil.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.nifi.minifi.commons.schema.common;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class CollectionUtil {
+ public static <T> List<T> nullToEmpty(List<T> list) {
+ return list == null ? Collections.emptyList() : list;
+ }
+
+ public static <T> Set<T> nullToEmpty(Set<T> set) {
+ return set == null ? Collections.emptySet() : set;
+ }
+
+ public static <K, V> Map<K, V> nullToEmpty(Map<K, V> map) {
+ return map == null ? Collections.emptyMap() : map;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java
index 5c3a432..a603f3e 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/CommonPropertyKeys.java
@@ -31,6 +31,7 @@ public class CommonPropertyKeys {
public static final String PROVENANCE_REPORTING_KEY = "Provenance Reporting";
public static final String REMOTE_PROCESSING_GROUPS_KEY = "Remote Processing Groups";
public static final String INPUT_PORTS_KEY = "Input Ports";
+ public static final String OUTPUT_PORTS_KEY = "Output Ports";
public static final String PROVENANCE_REPO_KEY = "Provenance Repository";
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/StringUtil.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/StringUtil.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/StringUtil.java
index ae7165e..68b4cc7 100644
--- a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/StringUtil.java
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/common/StringUtil.java
@@ -17,8 +17,40 @@
package org.apache.nifi.minifi.commons.schema.common;
+import java.util.function.Consumer;
+
public class StringUtil {
- public static boolean isNullOrEmpty(final String string) {
+ /**
+ * Returns true if the string is null or empty
+ *
+ * @param string the string
+ * @return true if the string is null or empty
+ */
+ public static boolean isNullOrEmpty(String string) {
return string == null || string.isEmpty();
}
+
+ /**
+ * Passes the string to the consumer if it is neither null nor empty
+ *
+ * @param string the input
+ * @param consumer the action to perform
+ */
+ public static void doIfNotNullOrEmpty(String string, Consumer<String> consumer) {
+ if (!isNullOrEmpty(string)) {
+ consumer.accept(string);
+ }
+ }
+
+ /**
+ * Passes the string to the consumer if it is either null nor empty
+ *
+ * @param string the input
+ * @param consumer the action to perform
+ */
+ public static void doIfNullOrEmpty(String string, Consumer<String> consumer) {
+ if (isNullOrEmpty(string)) {
+ consumer.accept(string);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/exception/SchemaInstantiatonException.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/exception/SchemaInstantiatonException.java b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/exception/SchemaInstantiatonException.java
new file mode 100644
index 0000000..d659df8
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema/exception/SchemaInstantiatonException.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.nifi.minifi.commons.schema.exception;
+
+public class SchemaInstantiatonException extends RuntimeException {
+ public SchemaInstantiatonException(String message) {
+ super(message);
+ }
+
+ public SchemaInstantiatonException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchemaTest.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchemaTest.java b/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchemaTest.java
new file mode 100644
index 0000000..53cf32a
--- /dev/null
+++ b/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/ProcessGroupSchemaTest.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.nifi.minifi.commons.schema;
+
+import org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+
+public class ProcessGroupSchemaTest {
+ @Test
+ public void testNoPortsRootGroup() {
+ validateIssuesNumMatches(0, new ProcessGroupSchema(new HashMap<>(), ConfigSchema.TOP_LEVEL_NAME));
+ }
+
+ @Test
+ public void testInputPortsRootGroup() {
+ Map<String, Object> map = new HashMap<>();
+ map.put(CommonPropertyKeys.INPUT_PORTS_KEY, Arrays.asList(createPortSchema("testId", "testName", ConfigSchema.TOP_LEVEL_NAME).toMap()));
+ validateIssuesNumMatches(1, new ProcessGroupSchema(map, ConfigSchema.TOP_LEVEL_NAME));
+ }
+
+ @Test
+ public void testOutputPortsRootGroup() {
+ Map<String, Object> map = new HashMap<>();
+ map.put(CommonPropertyKeys.OUTPUT_PORTS_KEY, Arrays.asList(createPortSchema("testId", "testName", ConfigSchema.TOP_LEVEL_NAME).toMap()));
+ validateIssuesNumMatches(1, new ProcessGroupSchema(map, ConfigSchema.TOP_LEVEL_NAME));
+ }
+
+ private PortSchema createPortSchema(String id, String name, String wrapperName) {
+ Map<String, Object> map = new HashMap<>();
+ map.put(CommonPropertyKeys.ID_KEY, id);
+ map.put(CommonPropertyKeys.NAME_KEY, name);
+ return new PortSchema(map, wrapperName);
+ }
+
+ private void validateIssuesNumMatches(int expected, ProcessGroupSchema processGroupSchema) {
+ int actual = processGroupSchema.getValidationIssues().size();
+ String issues = "[" + System.lineSeparator() + processGroupSchema.getValidationIssues().stream().collect(Collectors.joining("," + System.lineSeparator())) + "]";
+ assertEquals("Expected " + expected + " issue(s), got " + actual + ": " + issues, expected, actual);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/serialization/SchemaLoaderTest.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/serialization/SchemaLoaderTest.java b/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/serialization/SchemaLoaderTest.java
index 5b602ac..928bc03 100644
--- a/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/serialization/SchemaLoaderTest.java
+++ b/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/serialization/SchemaLoaderTest.java
@@ -79,12 +79,12 @@ public class SchemaLoaderTest {
private void validateMinimalConfigVersion1Parse(ConfigSchema configSchema) {
assertTrue(configSchema instanceof ConfigSchema);
- List<ConnectionSchema> connections = configSchema.getConnections();
+ List<ConnectionSchema> connections = configSchema.getProcessGroupSchema().getConnections();
assertNotNull(connections);
assertEquals(1, connections.size());
assertNotNull(connections.get(0).getId());
- List<ProcessorSchema> processors = configSchema.getProcessors();
+ List<ProcessorSchema> processors = configSchema.getProcessGroupSchema().getProcessors();
assertNotNull(processors);
assertEquals(2, processors.size());
processors.forEach(p -> assertNotNull(p.getId()));
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/v1/ConfigSchemaV1Test.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/v1/ConfigSchemaV1Test.java b/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/v1/ConfigSchemaV1Test.java
index 56a8103..7ce0587 100644
--- a/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/v1/ConfigSchemaV1Test.java
+++ b/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/v1/ConfigSchemaV1Test.java
@@ -69,8 +69,10 @@ public class ConfigSchemaV1Test {
ConfigSchema configSchema = new ConfigSchemaV1(yamlAsMap).convert();
List<String> validationIssues = configSchema.getValidationIssues();
assertEquals(4, validationIssues.size());
- assertEquals(BaseSchema.getIssueText(ConnectionSchema.DESTINATION_ID_KEY, CommonPropertyKeys.CONNECTIONS_KEY, BaseSchema.IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED), validationIssues.get(0));
- assertEquals(BaseSchema.getIssueText(ConnectionSchema.SOURCE_ID_KEY, CommonPropertyKeys.CONNECTIONS_KEY, BaseSchema.IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED), validationIssues.get(1));
+ assertEquals(BaseSchema.getIssueText(ConnectionSchema.DESTINATION_ID_KEY, "Connection(id: TailToSplit, name: TailToSplit)", BaseSchema.IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED),
+ validationIssues.get(0));
+ assertEquals(BaseSchema.getIssueText(ConnectionSchema.SOURCE_ID_KEY, "Connection(id: TailToSplit, name: TailToSplit)", BaseSchema.IT_WAS_NOT_FOUND_AND_IT_IS_REQUIRED),
+ validationIssues.get(1));
assertEquals(ConfigSchemaV1.CONNECTION_WITH_NAME + connection.get(NAME_KEY) + ConfigSchemaV1.HAS_INVALID_DESTINATION_NAME + fakeDestination, validationIssues.get(2));
assertEquals(ConfigSchemaV1.CONNECTION_WITH_NAME + connection.get(NAME_KEY) + ConfigSchemaV1.HAS_INVALID_SOURCE_NAME + fakeSource, validationIssues.get(3));
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/v1/ConnectionSchemaV1Test.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/v1/ConnectionSchemaV1Test.java b/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/v1/ConnectionSchemaV1Test.java
index e9acf4a..28b000c 100644
--- a/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/v1/ConnectionSchemaV1Test.java
+++ b/minifi-commons/minifi-commons-schema/src/test/java/org/apache/nifi/minifi/commons/schema/v1/ConnectionSchemaV1Test.java
@@ -179,7 +179,7 @@ public class ConnectionSchemaV1Test {
ConfigSchema configSchema = new ConfigSchemaV1(Collections.singletonMap(CommonPropertyKeys.CONNECTIONS_KEY, listWithKeyValues)).convert();
assertMessageDoesNotExist(configSchema, ConfigSchema.FOUND_THE_FOLLOWING_DUPLICATE_CONNECTION_IDS);
- List<ConnectionSchema> connections = configSchema.getConnections();
+ List<ConnectionSchema> connections = configSchema.getProcessGroupSchema().getConnections();
assertEquals(5, connections.size());
// Generated unique ids
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-docs/src/main/markdown/System_Admin_Guide.md
----------------------------------------------------------------------
diff --git a/minifi-docs/src/main/markdown/System_Admin_Guide.md b/minifi-docs/src/main/markdown/System_Admin_Guide.md
index 38c9588..0836443 100644
--- a/minifi-docs/src/main/markdown/System_Admin_Guide.md
+++ b/minifi-docs/src/main/markdown/System_Admin_Guide.md
@@ -248,7 +248,7 @@ parses and upconverts to the current version without issue.
1. Use ids instead of names for processors, connections.
2. Allow multiple source relationships for connections.
-
+3. Support process groups, input ports, output ports
## Flow Controller
@@ -391,6 +391,39 @@ Within the Processor Configuration section, there is the `Properties` subsection
State File: ./conf/state/tail-file
Initial Start Position: Beginning of File
+## Process Groups
+
+Process groups can be nested from the top level. They can contain other process groups as well and can be used to logically group related operations.
+
+*Property* | *Description*
+----------------------------------- | -------------
+name | The name of what this process group will do.
+id | The id of this process group. This needs to be set to a unique filesystem-friendly value (regex: [A-Za-z0-9_-]+)
+Processors | The processors contained in this Process Group. (Defined above)
+Remote Processing Groups | The remote processing groups contained in this Process Group. (Defined below)
+Connections | The connections contained in this Process Group. (Defined below)
+Input Ports | The input ports contained in this Process Group. (Defined below)
+Output Ports | The output ports contained in this Process Group. (Defined below)
+Process Groups | The child Process Groups contained in this Process Group.
+
+## Input Ports
+
+These ports provide input to the Process Group they reside on. (Currently only for internal Input ports.)
+
+*Property* | *Description*
+-------------------- | -------------
+name | The name of what this input port will do.
+id | The id of this input port. This needs to be set to a unique filesystem-friendly value (regex: [A-Za-z0-9_-]+)
+
+## Output Ports
+
+These ports provide output from the Process Group they reside on. (Currently only for internal Output ports.)
+
+*Property* | *Description*
+-------------------- | -------------
+name | The name of what this output port will do.
+id | The id of this output port. This needs to be set to a unique filesystem-friendly value (regex: [A-Za-z0-9_-]+)
+
## Connections
There can be multiple connections in this version of MiNiFi. The "Connections" subsection is a list of connections. Each connection must specify these properties.
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/config.yml
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/config.yml b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/config.yml
index 44d8d4e..c88e47e 100644
--- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/config.yml
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/config.yml
@@ -56,5 +56,8 @@ Security Properties:
algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL
provider: BC
Processors: []
+Process Groups: []
+Input Ports: []
+Output Ports: []
Connections: []
Remote Processing Groups: []
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMain.java
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMain.java b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMain.java
index 251e9a3..e62392d 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMain.java
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMain.java
@@ -28,6 +28,8 @@ import org.apache.nifi.web.api.dto.ConnectableDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.NiFiComponentDTO;
+import org.apache.nifi.web.api.dto.PortDTO;
+import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
@@ -42,14 +44,19 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.nifi.minifi.commons.schema.common.CollectionUtil.nullToEmpty;
public class ConfigMain {
public static final int ERR_INVALID_ARGS = 1;
@@ -134,80 +141,82 @@ public class ConfigMain {
System.out.println();
}
- private static void enrichTemplateDTO(TemplateDTO templateDTO) {
- FlowSnippetDTO flowSnippetDTO = templateDTO.getSnippet();
+ private static void enrichFlowSnippetDTO(FlowSnippetDTO flowSnippetDTO) {
+ List<FlowSnippetDTO> allFlowSnippets = getAllFlowSnippets(flowSnippetDTO);
- Set<RemoteProcessGroupDTO> remoteProcessGroups = flowSnippetDTO.getRemoteProcessGroups();
- if (remoteProcessGroups != null) {
- for (RemoteProcessGroupDTO remoteProcessGroupDTO : remoteProcessGroups) {
- if (StringUtil.isNullOrEmpty(remoteProcessGroupDTO.getName())) {
- remoteProcessGroupDTO.setName(remoteProcessGroupDTO.getTargetUri());
- }
- }
+ Set<RemoteProcessGroupDTO> remoteProcessGroups = getAll(allFlowSnippets, FlowSnippetDTO::getRemoteProcessGroups).collect(Collectors.toSet());
+
+ // RPGs with no name get Target URI as name
+ remoteProcessGroups.stream().filter(r -> StringUtil.isNullOrEmpty(r.getName())).forEach(r -> r.setName(r.getTargetUri()));
+
+ Map<String, String> connectableNameMap = getAll(allFlowSnippets, FlowSnippetDTO::getProcessors).collect(Collectors.toMap(NiFiComponentDTO::getId, ProcessorDTO::getName));
+
+ for (RemoteProcessGroupDTO remoteProcessGroupDTO : remoteProcessGroups) {
+ RemoteProcessGroupContentsDTO contents = remoteProcessGroupDTO.getContents();
+ addConnectables(connectableNameMap, nullToEmpty(contents.getInputPorts()), RemoteProcessGroupPortDTO::getId, RemoteProcessGroupPortDTO::getId);
+ addConnectables(connectableNameMap, nullToEmpty(contents.getOutputPorts()), RemoteProcessGroupPortDTO::getId, RemoteProcessGroupPortDTO::getId);
}
- Set<ConnectionDTO> connections = flowSnippetDTO.getConnections();
- if (connections != null) {
- Map<String, String> connectableNameMap = new HashMap<>();
- Set<ProcessorDTO> processorDTOs = flowSnippetDTO.getProcessors();
- if (processorDTOs != null) {
- connectableNameMap.putAll(processorDTOs.stream().collect(Collectors.toMap(NiFiComponentDTO::getId, ProcessorDTO::getName)));
- }
- if (remoteProcessGroups != null) {
- for (RemoteProcessGroupDTO remoteProcessGroupDTO : remoteProcessGroups) {
- RemoteProcessGroupContentsDTO contents = remoteProcessGroupDTO.getContents();
- addRemoteProcessGroupPortDTOs(connectableNameMap, contents.getInputPorts());
- addRemoteProcessGroupPortDTOs(connectableNameMap, contents.getOutputPorts());
+ addConnectables(connectableNameMap, getAll(allFlowSnippets, FlowSnippetDTO::getInputPorts).collect(Collectors.toList()), PortDTO::getId, PortDTO::getName);
+ addConnectables(connectableNameMap, getAll(allFlowSnippets, FlowSnippetDTO::getOutputPorts).collect(Collectors.toList()), PortDTO::getId, PortDTO::getName);
+
+ Set<ConnectionDTO> connections = getAll(allFlowSnippets, FlowSnippetDTO::getConnections).collect(Collectors.toSet());
+ for (ConnectionDTO connection : connections) {
+ setName(connectableNameMap, connection.getSource());
+ setName(connectableNameMap, connection.getDestination());
+ }
+
+ for (ConnectionDTO connection : connections) {
+ if (StringUtil.isNullOrEmpty(connection.getName())) {
+ StringBuilder name = new StringBuilder();
+ ConnectableDTO connectionSource = connection.getSource();
+ if (connectionSource != null) {
+ name.append(connectionSource.getName());
}
- }
- for (ConnectionDTO connection : connections) {
- setName(connectableNameMap, connection.getSource());
- setName(connectableNameMap, connection.getDestination());
- }
- for (ConnectionDTO connection : connections) {
- if (StringUtil.isNullOrEmpty(connection.getName())) {
- StringBuilder name = new StringBuilder();
- ConnectableDTO connectionSource = connection.getSource();
- if (connectionSource != null) {
- name.append(connectionSource.getName());
- }
- name.append("/");
- if (connection.getSelectedRelationships() != null && connection.getSelectedRelationships().size() > 0) {
- name.append(connection.getSelectedRelationships().iterator().next());
- }
- name.append("/");
- ConnectableDTO connectionDestination = connection.getDestination();
- if (connectionDestination != null) {
- name.append(connectionDestination.getName());
- }
- connection.setName(name.toString());
+ name.append("/");
+ if (connection.getSelectedRelationships() != null && connection.getSelectedRelationships().size() > 0) {
+ name.append(connection.getSelectedRelationships().iterator().next());
+ }
+ name.append("/");
+ ConnectableDTO connectionDestination = connection.getDestination();
+ if (connectionDestination != null) {
+ name.append(connectionDestination.getName());
}
+ connection.setName(name.toString());
}
}
+ nullToEmpty(flowSnippetDTO.getProcessGroups()).stream().map(ProcessGroupDTO::getContents).forEach(ConfigMain::enrichFlowSnippetDTO);
}
- public static ConfigSchema transformTemplateToSchema(InputStream source) throws JAXBException, SchemaLoaderException {
- TemplateDTO templateDTO = (TemplateDTO) JAXBContext.newInstance(TemplateDTO.class).createUnmarshaller().unmarshal(source);
+ private static <T> Stream<T> getAll(List<FlowSnippetDTO> allFlowSnippets, Function<FlowSnippetDTO, Collection<T>> accessor) {
+ return allFlowSnippets.stream().flatMap(f -> accessor.apply(f).stream()).filter(Objects::nonNull);
+ }
- if (templateDTO.getSnippet().getProcessGroups().size() != 0){
- throw new SchemaLoaderException("Process Groups are not currently supported in MiNiFi. Please remove any from the template and try again.");
- }
+ private static List<FlowSnippetDTO> getAllFlowSnippets(FlowSnippetDTO flowSnippetDTO) {
+ List<FlowSnippetDTO> result = new ArrayList<>();
+ getAllFlowSnippets(flowSnippetDTO, result);
+ return result;
+ }
- if (templateDTO.getSnippet().getOutputPorts().size() != 0){
- throw new SchemaLoaderException("Output Ports are not currently supported in MiNiFi. Please remove any from the template and try again.");
- }
+ private static void getAllFlowSnippets(FlowSnippetDTO flowSnippetDTO, List<FlowSnippetDTO> result) {
+ result.add(flowSnippetDTO);
+ nullToEmpty(flowSnippetDTO.getProcessGroups()).stream().map(ProcessGroupDTO::getContents).forEach(f -> getAllFlowSnippets(f, result));
+ }
- if (templateDTO.getSnippet().getInputPorts().size() != 0){
- throw new SchemaLoaderException("Input Ports are not currently supported in MiNiFi. Please remove any from the template and try again.");
- }
+ public static ConfigSchema transformTemplateToSchema(InputStream source) throws JAXBException, IOException, SchemaLoaderException {
+ try {
+ TemplateDTO templateDTO = (TemplateDTO) JAXBContext.newInstance(TemplateDTO.class).createUnmarshaller().unmarshal(source);
- if (templateDTO.getSnippet().getFunnels().size() != 0){
- throw new SchemaLoaderException("Funnels are not currently supported in MiNiFi. Please remove any from the template and try again.");
- }
+ if (templateDTO.getSnippet().getFunnels().size() != 0){
+ throw new SchemaLoaderException("Funnels are not currently supported in MiNiFi. Please remove any from the template and try again.");
+ }
- enrichTemplateDTO(templateDTO);
- ConfigSchema configSchema = new ConfigSchemaFunction().apply(templateDTO);
- return configSchema;
+ enrichFlowSnippetDTO(templateDTO.getSnippet());
+ ConfigSchema configSchema = new ConfigSchemaFunction().apply(templateDTO);
+ return configSchema;
+ } finally {
+ source.close();
+ }
}
private static void setName(Map<String, String> connectableNameMap, ConnectableDTO connectableDTO) {
@@ -219,10 +228,6 @@ public class ConfigMain {
}
}
- private static void addRemoteProcessGroupPortDTOs(Map<String, String> connectableNameMap, Collection<RemoteProcessGroupPortDTO> ports) {
- addConnectables(connectableNameMap, ports, RemoteProcessGroupPortDTO::getId, RemoteProcessGroupPortDTO::getId);
- }
-
private static <T> void addConnectables(Map<String, String> connectableNameMap, Collection<T> hasIdAndNames, Function<T, String> idGetter, Function<T, String> nameGetter) {
if (hasIdAndNames != null) {
for (T hasIdAndName : hasIdAndNames) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConfigSchemaFunction.java
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConfigSchemaFunction.java b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConfigSchemaFunction.java
index 4fc9f5a..9cdccf5 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConfigSchemaFunction.java
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConfigSchemaFunction.java
@@ -19,10 +19,12 @@ package org.apache.nifi.minifi.toolkit.configuration.dto;
import org.apache.nifi.minifi.commons.schema.ConfigSchema;
import org.apache.nifi.minifi.commons.schema.ConnectionSchema;
+import org.apache.nifi.minifi.commons.schema.PortSchema;
+import org.apache.nifi.minifi.commons.schema.ProcessGroupSchema;
import org.apache.nifi.minifi.commons.schema.ProcessorSchema;
import org.apache.nifi.minifi.commons.schema.RemoteProcessingGroupSchema;
-import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
import org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys;
+import org.apache.nifi.minifi.commons.schema.common.StringUtil;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.TemplateDTO;
@@ -32,51 +34,96 @@ import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static org.apache.nifi.minifi.commons.schema.common.CollectionUtil.nullToEmpty;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.ID_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.INPUT_PORTS_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NAME_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.OUTPUT_PORTS_KEY;
+
public class ConfigSchemaFunction implements Function<TemplateDTO, ConfigSchema> {
private final FlowControllerSchemaFunction flowControllerSchemaFunction;
private final ProcessorSchemaFunction processorSchemaFunction;
private final ConnectionSchemaFunction connectionSchemaFunction;
private final RemoteProcessingGroupSchemaFunction remoteProcessingGroupSchemaFunction;
+ private final PortSchemaFunction inputPortSchemaFunction;
+ private final PortSchemaFunction outputPortSchemaFunction;
public ConfigSchemaFunction() {
- this(new FlowControllerSchemaFunction(), new ProcessorSchemaFunction(), new ConnectionSchemaFunction(), new RemoteProcessingGroupSchemaFunction(new RemoteInputPortSchemaFunction()));
+ this(new FlowControllerSchemaFunction(), new ProcessorSchemaFunction(), new ConnectionSchemaFunction(), new RemoteProcessingGroupSchemaFunction(new RemoteInputPortSchemaFunction()),
+ new PortSchemaFunction(INPUT_PORTS_KEY), new PortSchemaFunction(OUTPUT_PORTS_KEY));
}
- public ConfigSchemaFunction(FlowControllerSchemaFunction flowControllerSchemaFunction, ProcessorSchemaFunction processorSchemaFunction,
- ConnectionSchemaFunction connectionSchemaFunction, RemoteProcessingGroupSchemaFunction remoteProcessingGroupSchemaFunction) {
+ public ConfigSchemaFunction(FlowControllerSchemaFunction flowControllerSchemaFunction, ProcessorSchemaFunction processorSchemaFunction, ConnectionSchemaFunction connectionSchemaFunction,
+ RemoteProcessingGroupSchemaFunction remoteProcessingGroupSchemaFunction, PortSchemaFunction inputPortSchemaFunction, PortSchemaFunction outputPortSchemaFunction) {
this.flowControllerSchemaFunction = flowControllerSchemaFunction;
this.processorSchemaFunction = processorSchemaFunction;
this.connectionSchemaFunction = connectionSchemaFunction;
this.remoteProcessingGroupSchemaFunction = remoteProcessingGroupSchemaFunction;
+ this.inputPortSchemaFunction = inputPortSchemaFunction;
+ this.outputPortSchemaFunction = outputPortSchemaFunction;
}
@Override
public ConfigSchema apply(TemplateDTO templateDTO) {
Map<String, Object> map = new HashMap<>();
+ map.put(CommonPropertyKeys.FLOW_CONTROLLER_PROPS_KEY, flowControllerSchemaFunction.apply(templateDTO).toMap());
+
FlowSnippetDTO snippet = templateDTO.getSnippet();
- map.put(CommonPropertyKeys.FLOW_CONTROLLER_PROPS_KEY, flowControllerSchemaFunction.apply(templateDTO).toMap());
+ addSnippet(map, snippet);
+
+ return new ConfigSchema(map);
+ }
+
+ protected void addSnippet(Map<String, Object> map, FlowSnippetDTO snippet) {
+ addSnippet(map, null, null, snippet);
+ }
+
+ protected Map<String, Object> addSnippet(Map<String, Object> map, String id, String name, FlowSnippetDTO snippet) {
+ if (!StringUtil.isNullOrEmpty(id)) {
+ map.put(ID_KEY, id);
+ }
- map.put(CommonPropertyKeys.PROCESSORS_KEY, BaseSchema.nullToEmpty(snippet.getProcessors()).stream()
+ if (!StringUtil.isNullOrEmpty(name)) {
+ map.put(NAME_KEY, name);
+ }
+
+ map.put(CommonPropertyKeys.PROCESSORS_KEY, nullToEmpty(snippet.getProcessors()).stream()
.map(processorSchemaFunction)
.sorted(Comparator.comparing(ProcessorSchema::getName))
.map(ProcessorSchema::toMap)
.collect(Collectors.toList()));
- map.put(CommonPropertyKeys.CONNECTIONS_KEY, BaseSchema.nullToEmpty(snippet.getConnections()).stream()
+
+
+ map.put(CommonPropertyKeys.CONNECTIONS_KEY, nullToEmpty(snippet.getConnections()).stream()
.map(connectionSchemaFunction)
.sorted(Comparator.comparing(ConnectionSchema::getName))
.map(ConnectionSchema::toMap)
.collect(Collectors.toList()));
- map.put(CommonPropertyKeys.REMOTE_PROCESSING_GROUPS_KEY, BaseSchema.nullToEmpty(snippet.getRemoteProcessGroups()).stream()
+ map.put(CommonPropertyKeys.REMOTE_PROCESSING_GROUPS_KEY, nullToEmpty(snippet.getRemoteProcessGroups()).stream()
.map(remoteProcessingGroupSchemaFunction)
.sorted(Comparator.comparing(RemoteProcessingGroupSchema::getName))
.map(RemoteProcessingGroupSchema::toMap)
.collect(Collectors.toList()));
- ConfigSchema configSchema = new ConfigSchema(map);
- return configSchema;
+ map.put(INPUT_PORTS_KEY, nullToEmpty(snippet.getInputPorts()).stream()
+ .map(inputPortSchemaFunction)
+ .sorted(Comparator.comparing(PortSchema::getName))
+ .map(PortSchema::toMap)
+ .collect(Collectors.toList()));
+
+ map.put(OUTPUT_PORTS_KEY, nullToEmpty(snippet.getOutputPorts()).stream()
+ .map(outputPortSchemaFunction)
+ .sorted(Comparator.comparing(PortSchema::getName))
+ .map(PortSchema::toMap)
+ .collect(Collectors.toList()));
+
+ map.put(ProcessGroupSchema.PROCESS_GROUPS_KEY, nullToEmpty(snippet.getProcessGroups()).stream()
+ .map(p -> addSnippet(new HashMap<>(), p.getId(), p.getName(), p.getContents())).collect(Collectors.toList()));
+
+ return map;
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConnectionSchemaFunction.java
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConnectionSchemaFunction.java b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConnectionSchemaFunction.java
index d3d71e6..7acab41 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConnectionSchemaFunction.java
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ConnectionSchemaFunction.java
@@ -19,7 +19,6 @@ package org.apache.nifi.minifi.toolkit.configuration.dto;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.minifi.commons.schema.ConnectionSchema;
-import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import java.util.HashMap;
@@ -29,6 +28,7 @@ import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static org.apache.nifi.minifi.commons.schema.common.CollectionUtil.nullToEmpty;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.CONNECTIONS_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.ID_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NAME_KEY;
@@ -40,14 +40,14 @@ public class ConnectionSchemaFunction implements Function<ConnectionDTO, Connect
map.put(ID_KEY, connectionDTO.getId());
map.put(NAME_KEY, connectionDTO.getName());
map.put(ConnectionSchema.SOURCE_ID_KEY, connectionDTO.getSource().getId());
- Set<String> selectedRelationships = BaseSchema.nullToEmpty(connectionDTO.getSelectedRelationships());
+ Set<String> selectedRelationships = nullToEmpty(connectionDTO.getSelectedRelationships());
map.put(ConnectionSchema.SOURCE_RELATIONSHIP_NAMES_KEY, selectedRelationships.stream().sorted().collect(Collectors.toList()));
map.put(ConnectionSchema.DESTINATION_ID_KEY, connectionDTO.getDestination().getId());
map.put(ConnectionSchema.MAX_WORK_QUEUE_SIZE_KEY, connectionDTO.getBackPressureObjectThreshold());
map.put(ConnectionSchema.MAX_WORK_QUEUE_DATA_SIZE_KEY, connectionDTO.getBackPressureDataSizeThreshold());
map.put(ConnectionSchema.FLOWFILE_EXPIRATION__KEY, connectionDTO.getFlowFileExpiration());
- List<String> queuePrioritizers = BaseSchema.nullToEmpty(connectionDTO.getPrioritizers());
+ List<String> queuePrioritizers = nullToEmpty(connectionDTO.getPrioritizers());
if (queuePrioritizers.size() > 0) {
map.put(ConnectionSchema.QUEUE_PRIORITIZER_CLASS_KEY, queuePrioritizers.get(0));
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/PortSchemaFunction.java
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/PortSchemaFunction.java b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/PortSchemaFunction.java
new file mode 100644
index 0000000..29efac3
--- /dev/null
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/PortSchemaFunction.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.nifi.minifi.toolkit.configuration.dto;
+
+import org.apache.nifi.minifi.commons.schema.PortSchema;
+import org.apache.nifi.web.api.dto.PortDTO;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.ID_KEY;
+import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NAME_KEY;
+
+public class PortSchemaFunction implements Function<PortDTO, PortSchema> {
+ private final String wrapperName;
+
+ public PortSchemaFunction(String wrapperName) {
+ this.wrapperName = wrapperName;
+ }
+
+ @Override
+ public PortSchema apply(PortDTO portDTO) {
+ Map<String, Object> map = new HashMap<>();
+ map.put(ID_KEY, portDTO.getId());
+ map.put(NAME_KEY, portDTO.getName());
+ return new PortSchema(map, wrapperName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ProcessorSchemaFunction.java
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ProcessorSchemaFunction.java b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ProcessorSchemaFunction.java
index 21b3345..89097cc 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ProcessorSchemaFunction.java
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/dto/ProcessorSchemaFunction.java
@@ -18,7 +18,6 @@
package org.apache.nifi.minifi.toolkit.configuration.dto;
import org.apache.nifi.minifi.commons.schema.ProcessorSchema;
-import org.apache.nifi.minifi.commons.schema.common.BaseSchema;
import org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
@@ -29,6 +28,7 @@ import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static org.apache.nifi.minifi.commons.schema.common.CollectionUtil.nullToEmpty;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.ID_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.NAME_KEY;
import static org.apache.nifi.minifi.commons.schema.common.CommonPropertyKeys.SCHEDULING_PERIOD_KEY;
@@ -53,11 +53,11 @@ public class ProcessorSchemaFunction implements Function<ProcessorDTO, Processor
if (runDurationMillis != null) {
map.put(ProcessorSchema.RUN_DURATION_NANOS_KEY, runDurationMillis * 1000);
}
- map.put(ProcessorSchema.AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY, BaseSchema.nullToEmpty(processorDTO.getRelationships()).stream()
+ map.put(ProcessorSchema.AUTO_TERMINATED_RELATIONSHIPS_LIST_KEY, nullToEmpty(processorDTO.getRelationships()).stream()
.filter(RelationshipDTO::isAutoTerminate)
.map(RelationshipDTO::getName)
.collect(Collectors.toList()));
- map.put(ProcessorSchema.PROCESSOR_PROPS_KEY, new HashMap<>(BaseSchema.nullToEmpty(processorDTOConfig.getProperties())));
+ map.put(ProcessorSchema.PROCESSOR_PROPS_KEY, new HashMap<>(nullToEmpty(processorDTOConfig.getProperties())));
String annotationData = processorDTOConfig.getAnnotationData();
if(annotationData != null && !annotationData.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java b/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java
index cfdc48f..d61a641 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/ConfigMainTest.java
@@ -148,7 +148,7 @@ public class ConfigMainTest {
@Test
public void testTransformErrorTransformingTemplate() throws FileNotFoundException {
when(pathInputStreamFactory.create(testInput)).thenAnswer(invocation ->
- ConfigMainTest.class.getClassLoader().getResourceAsStream("TemplateWithInputPort.xml"));
+ ConfigMainTest.class.getClassLoader().getResourceAsStream("TemplateWithFunnel.xml"));
assertEquals(ConfigMain.ERR_UNABLE_TO_TRANSFORM_TEMPLATE, configMain.execute(new String[]{ConfigMain.TRANSFORM, testInput, testOutput}));
}
@@ -190,18 +190,23 @@ public class ConfigMainTest {
transformRoundTrip("MultipleRelationships");
}
- @Test(expected = SchemaLoaderException.class)
- public void testFailToTransformProcessGroup() throws IOException, JAXBException, SchemaLoaderException {
+ @Test
+ public void testTransformRoundTripProcessGroupsAndRemoteProcessGroups() throws IOException, JAXBException, SchemaLoaderException {
+ transformRoundTrip("ProcessGroupsAndRemoteProcessGroups");
+ }
+
+ @Test
+ public void testSuccessTransformProcessGroup() throws IOException, JAXBException, SchemaLoaderException {
ConfigMain.transformTemplateToSchema(getClass().getClassLoader().getResourceAsStream("TemplateWithProcessGroup.xml")).toMap();
}
- @Test(expected = SchemaLoaderException.class)
- public void testFailToTransformInputPort() throws IOException, JAXBException, SchemaLoaderException {
+ @Test
+ public void testSuccessTransformInputPort() throws IOException, JAXBException, SchemaLoaderException {
ConfigMain.transformTemplateToSchema(getClass().getClassLoader().getResourceAsStream("TemplateWithOutputPort.xml")).toMap();
}
- @Test(expected = SchemaLoaderException.class)
- public void testFailToTransformOutputPort() throws IOException, JAXBException, SchemaLoaderException {
+ @Test
+ public void testSuccessTransformOutputPort() throws IOException, JAXBException, SchemaLoaderException {
ConfigMain.transformTemplateToSchema(getClass().getClassLoader().getResourceAsStream("TemplateWithInputPort.xml")).toMap();
}
@@ -278,8 +283,8 @@ public class ConfigMainTest {
assertTrue(configSchemaUpgradedFromV1.isValid());
assertEquals(configSchemaConvertableSchema, configSchemaUpgradedFromV1);
ConfigSchema configSchemaFromCurrent = new ConfigSchema(yamlMap);
- List<ProcessorSchema> currentProcessors = configSchemaFromCurrent.getProcessors();
- List<ProcessorSchema> v1Processors = configSchemaUpgradedFromV1.getProcessors();
+ List<ProcessorSchema> currentProcessors = configSchemaFromCurrent.getProcessGroupSchema().getProcessors();
+ List<ProcessorSchema> v1Processors = configSchemaUpgradedFromV1.getProcessGroupSchema().getProcessors();
assertEquals(currentProcessors.size(), v1Processors.size());
// V1 doesn't have ids so we need to map the autogenerated ones to the ones from the template
@@ -291,11 +296,12 @@ public class ConfigMainTest {
v1IdToCurrentIdMap.put(v1Processor.getId(), currentProcessor.getId());
v1Processor.setId(currentProcessor.getId());
}
- configSchemaUpgradedFromV1.getRemoteProcessingGroups().stream().flatMap(g -> g.getInputPorts().stream()).map(RemoteInputPortSchema::getId).sequential()
+
+ configSchemaUpgradedFromV1.getProcessGroupSchema().getRemoteProcessingGroups().stream().flatMap(g -> g.getInputPorts().stream()).map(RemoteInputPortSchema::getId).sequential()
.forEach(id -> v1IdToCurrentIdMap.put(id, id));
- List<ConnectionSchema> currentConnections = configSchemaFromCurrent.getConnections();
- List<ConnectionSchema> v1Connections = configSchemaUpgradedFromV1.getConnections();
+ List<ConnectionSchema> currentConnections = configSchemaFromCurrent.getProcessGroupSchema().getConnections();
+ List<ConnectionSchema> v1Connections = configSchemaUpgradedFromV1.getProcessGroupSchema().getConnections();
// Update source and dest ids, can set connection id equal because it isn't referenced elsewhere
assertEquals(currentConnections.size(), v1Connections.size());
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/dto/PortSchemaFunctionTest.java
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/dto/PortSchemaFunctionTest.java b/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/dto/PortSchemaFunctionTest.java
new file mode 100644
index 0000000..05c83cb
--- /dev/null
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/java/org/apache/nifi/minifi/toolkit/configuration/dto/PortSchemaFunctionTest.java
@@ -0,0 +1,74 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.nifi.minifi.toolkit.configuration.dto;
+
+import org.apache.nifi.minifi.commons.schema.PortSchema;
+import org.apache.nifi.web.api.dto.PortDTO;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class PortSchemaFunctionTest {
+ private String testId;
+ private String testName;
+ private String testWrapperName;
+ private PortDTO portDTO;
+ private PortSchemaFunction portSchemaFunction;
+
+ @Before
+ public void setup() {
+ testId = "testId";
+ testName = "testName";
+ testWrapperName = "testWrapperName";
+ portDTO = new PortDTO();
+ portDTO.setId(testId);
+ portDTO.setName(testName);
+ portSchemaFunction = new PortSchemaFunction(testWrapperName);
+ }
+
+ @Test
+ public void testFullMap() {
+ PortSchema portSchema = portSchemaFunction.apply(portDTO);
+ assertEquals(testId, portSchema.getId());
+ assertEquals(testName, portSchema.getName());
+ assertTrue(portSchema.isValid());
+ }
+
+ @Test
+ public void testNoId() {
+ portDTO.setId(null);
+ PortSchema portSchema = portSchemaFunction.apply(portDTO);
+ assertEquals("", portSchema.getId());
+ assertEquals(testName, portSchema.getName());
+ assertFalse(portSchema.isValid());
+ }
+
+ @Test
+ public void testNoName() {
+ portDTO.setName(null);
+ PortSchema portSchema = portSchemaFunction.apply(portDTO);
+ assertEquals(testId, portSchema.getId());
+ assertEquals("", portSchema.getName());
+ assertTrue(portSchema.isValid());
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/CsvToJson.yml
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/CsvToJson.yml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/CsvToJson.yml
index b4993e3..d03bd16 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/CsvToJson.yml
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/CsvToJson.yml
@@ -148,6 +148,9 @@ Processors:
- success
Properties:
Delete Attributes Expression:
+Process Groups: []
+Input Ports: []
+Output Ports: []
Connections:
- name: ExtractText/matched/ReplaceText2
id: 56ef3e2e-ee35-4598-9fbe-ae86050960b0
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/DecompressionCircularFlow.yml
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/DecompressionCircularFlow.yml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/DecompressionCircularFlow.yml
index fdec427..d2f90b2 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/DecompressionCircularFlow.yml
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/DecompressionCircularFlow.yml
@@ -187,6 +187,9 @@ Processors:
Compression Level:
Mode: decompress
Update Filename:
+Process Groups: []
+Input Ports: []
+Output Ports: []
Connections:
- name: Compressed?/gzip/Uncompress GZIP
id: 5de215d5-9f7e-414b-98aa-2edaa0514d99
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/InvokeHttpMiNiFiTemplateTest.yml
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/InvokeHttpMiNiFiTemplateTest.yml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/InvokeHttpMiNiFiTemplateTest.yml
index dcd75be..aa7d6d5 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/InvokeHttpMiNiFiTemplateTest.yml
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/InvokeHttpMiNiFiTemplateTest.yml
@@ -166,6 +166,9 @@ Processors:
Properties:
Delete Attributes Expression:
q: nifi
+Process Groups: []
+Input Ports: []
+Output Ports: []
Connections:
- name: Route On Status Code/200/LogAttribute
id: 3039718a-bb40-4811-9b74-ecbe926daae8
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/31855bbc/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/MultipleRelationships.yml
----------------------------------------------------------------------
diff --git a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/MultipleRelationships.yml b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/MultipleRelationships.yml
index 2850e67..12ed7e1 100644
--- a/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/MultipleRelationships.yml
+++ b/minifi-toolkit/minifi-toolkit-configuration/src/test/resources/MultipleRelationships.yml
@@ -114,6 +114,9 @@ Processors:
Properties:
Delete Attributes Expression:
filename: abc
+Process Groups: []
+Input Ports: []
+Output Ports: []
Connections:
- name: GenerateFlowFile/success/UpdateAttribute
id: 7c79cce3-0157-1000-0000-000000000000