You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/11/27 23:02:37 UTC
nifi-registry git commit: NIFIREG-57: Added
EvolvingDifferenceDescriptor vs. StaticDifferenceDescriptor
Repository: nifi-registry
Updated Branches:
refs/heads/master 589253778 -> a2f639f37
NIFIREG-57: Added EvolvingDifferenceDescriptor vs. StaticDifferenceDescriptor
This closes #42.
Signed-off-by: Bryan Bende <bb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-registry/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-registry/commit/a2f639f3
Tree: http://git-wip-us.apache.org/repos/asf/nifi-registry/tree/a2f639f3
Diff: http://git-wip-us.apache.org/repos/asf/nifi-registry/diff/a2f639f3
Branch: refs/heads/master
Commit: a2f639f37819b464794fe4f790731026d34c0c55
Parents: 5892537
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Nov 21 14:40:00 2017 -0500
Committer: Bryan Bende <bb...@apache.org>
Committed: Mon Nov 27 18:02:13 2017 -0500
----------------------------------------------------------------------
.../registry/flow/VersionedFlowCoordinates.java | 15 ++
.../registry/flow/VersionedRemoteGroupPort.java | 23 ++-
.../ConciseEvolvingDifferenceDescriptor.java | 79 +++++++++++
.../flow/diff/EvolvingDifferenceDescriptor.java | 6 +
.../flow/diff/StandardFlowComparator.java | 140 +++++++------------
.../flow/diff/StaticDifferenceDescriptor.java | 29 ++++
6 files changed, 200 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a2f639f3/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowCoordinates.java
----------------------------------------------------------------------
diff --git a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowCoordinates.java b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowCoordinates.java
index ac98933..8e39c5b 100644
--- a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowCoordinates.java
+++ b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowCoordinates.java
@@ -26,6 +26,7 @@ public class VersionedFlowCoordinates {
private String bucketId;
private String flowId;
private int version;
+ private Boolean latest;
@ApiModelProperty("The URL of the Flow Registry that contains the flow")
public String getRegistryUrl() {
@@ -63,6 +64,15 @@ public class VersionedFlowCoordinates {
this.version = version;
}
+ @ApiModelProperty("Whether or not these coordinates point to the latest version of the flow")
+ public Boolean getLatest() {
+ return latest;
+ }
+
+ public void setLatest(Boolean latest) {
+ this.latest = latest;
+ }
+
@Override
public int hashCode() {
return Objects.hash(registryUrl, bucketId, flowId, version);
@@ -83,4 +93,9 @@ public class VersionedFlowCoordinates {
final VersionedFlowCoordinates other = (VersionedFlowCoordinates) obj;
return Objects.equals(registryUrl, other.registryUrl) && Objects.equals(bucketId, other.bucketId) && Objects.equals(flowId, other.flowId) && Objects.equals(version, other.version);
}
+
+ @Override
+ public String toString() {
+ return "VersionedFlowCoordinates[bucketId=" + bucketId + ", flowId=" + flowId + ", version=" + version + ", registryUrl=" + registryUrl + "]";
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a2f639f3/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedRemoteGroupPort.java b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedRemoteGroupPort.java
index 73b037e..ca85ce4 100644
--- a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedRemoteGroupPort.java
+++ b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedRemoteGroupPort.java
@@ -22,11 +22,12 @@ import java.util.Objects;
import io.swagger.annotations.ApiModelProperty;
public class VersionedRemoteGroupPort extends VersionedComponent {
- private String groupId;
+ private String remoteGroupId;
private Integer concurrentlySchedulableTaskCount;
private Boolean useCompression;
private BatchSize batchSize;
private ComponentType componentType;
+ private String targetId;
@ApiModelProperty("The number of task that may transmit flowfiles to the target port concurrently.")
public Integer getConcurrentlySchedulableTaskCount() {
@@ -38,12 +39,12 @@ public class VersionedRemoteGroupPort extends VersionedComponent {
}
@ApiModelProperty("The id of the remote process group that the port resides in.")
- public String getGroupId() {
- return groupId;
+ public String getRemoteGroupId() {
+ return remoteGroupId;
}
- public void setGroupId(String groupId) {
- this.groupId = groupId;
+ public void setRemoteGroupId(String groupId) {
+ this.remoteGroupId = groupId;
}
@@ -61,10 +62,19 @@ public class VersionedRemoteGroupPort extends VersionedComponent {
return batchSize;
}
- public void setBatchSettings(BatchSize batchSize) {
+ public void setBatchSize(BatchSize batchSize) {
this.batchSize = batchSize;
}
+ @ApiModelProperty("The ID of the port on the target NiFi instance")
+ public String getTargetId() {
+ return targetId;
+ }
+
+ public void setTargetId(final String targetId) {
+ this.targetId = targetId;
+ }
+
@Override
public int hashCode() {
return 923847 + String.valueOf(getName()).hashCode();
@@ -88,6 +98,7 @@ public class VersionedRemoteGroupPort extends VersionedComponent {
return componentType;
}
+ @Override
public void setComponentType(final ComponentType componentType) {
if (componentType != ComponentType.REMOTE_INPUT_PORT && componentType != ComponentType.REMOTE_OUTPUT_PORT) {
throw new IllegalArgumentException();
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a2f639f3/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/ConciseEvolvingDifferenceDescriptor.java
----------------------------------------------------------------------
diff --git a/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/ConciseEvolvingDifferenceDescriptor.java b/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/ConciseEvolvingDifferenceDescriptor.java
new file mode 100644
index 0000000..eb8c874
--- /dev/null
+++ b/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/ConciseEvolvingDifferenceDescriptor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow.diff;
+
+import java.util.Objects;
+
+import org.apache.nifi.registry.flow.VersionedComponent;
+import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
+
+/**
+ * Describes differences between flows as if Flow A is an 'earlier version' of the same flow than Flow B.
+ * This provides verbiage such as "Processor with ID 123 was added to flow."
+ */
+public class ConciseEvolvingDifferenceDescriptor implements DifferenceDescriptor {
+
+ @Override
+ public String describeDifference(final DifferenceType type, final String flowAName, final String flowBName, final VersionedComponent componentA,
+ final VersionedComponent componentB, final Object valueA, final Object valueB) {
+
+ final String description;
+ switch (type) {
+ case COMPONENT_ADDED:
+ description = String.format("%s was added", componentB.getComponentType().getTypeName());
+ break;
+ case COMPONENT_REMOVED:
+ description = String.format("%s was removed", componentA.getComponentType().getTypeName());
+ break;
+ case PROPERTY_ADDED:
+ description = String.format("Property '%s' was added", valueB);
+ break;
+ case PROPERTY_REMOVED:
+ description = String.format("Property '%s' was removed", valueA);
+ break;
+ case VARIABLE_ADDED:
+ description = String.format("Variable '%s' was added", valueB);
+ break;
+ case VARIABLE_REMOVED:
+ description = String.format("Variable '%s' was removed", valueA);
+ break;
+ case VERSIONED_FLOW_COORDINATES_CHANGED:
+ if (valueA instanceof VersionedFlowCoordinates && valueB instanceof VersionedFlowCoordinates) {
+ final VersionedFlowCoordinates coordinatesA = (VersionedFlowCoordinates) valueA;
+ final VersionedFlowCoordinates coordinatesB = (VersionedFlowCoordinates) valueB;
+
+ // If the two vary only by version, then use a more concise message. If anything else is different, then use a fully explanation.
+ if (Objects.equals(coordinatesA.getRegistryUrl(), coordinatesB.getRegistryUrl()) && Objects.equals(coordinatesA.getBucketId(), coordinatesB.getBucketId())
+ && Objects.equals(coordinatesA.getFlowId(), coordinatesB.getFlowId()) && coordinatesA.getVersion() != coordinatesB.getVersion()) {
+
+ description = String.format("Flow Version changed from %s to %s", coordinatesA.getVersion(), coordinatesB.getVersion());
+ break;
+ }
+ }
+
+ description = String.format("From '%s' to '%s'", valueA, valueB);
+ break;
+ default:
+ description = String.format("From '%s' to '%s'", valueA, valueB);
+ break;
+ }
+
+ return description;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a2f639f3/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/EvolvingDifferenceDescriptor.java
----------------------------------------------------------------------
diff --git a/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/EvolvingDifferenceDescriptor.java b/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/EvolvingDifferenceDescriptor.java
index d1309b4..a4d1a65 100644
--- a/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/EvolvingDifferenceDescriptor.java
+++ b/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/EvolvingDifferenceDescriptor.java
@@ -43,6 +43,12 @@ public class EvolvingDifferenceDescriptor implements DifferenceDescriptor {
case PROPERTY_REMOVED:
description = String.format("Property '%s' was removed from %s with ID %s", valueA, componentA.getComponentType().getTypeName(), componentA.getIdentifier());
break;
+ case VARIABLE_ADDED:
+ description = String.format("Variable '%s' was added to Process Group with ID %s", valueB, componentB.getIdentifier());
+ break;
+ case VARIABLE_REMOVED:
+ description = String.format("Variable '%s' was removed from Process Group with ID %s", valueA, componentA.getIdentifier());
+ break;
default:
description = String.format("%s for %s with ID %s from '%s' to '%s'",
type.getDescription(), componentA.getComponentType().getTypeName(), componentA.getIdentifier(), valueA, valueB);
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a2f639f3/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java
----------------------------------------------------------------------
diff --git a/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java b/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java
index a54dab1..260909e 100644
--- a/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java
+++ b/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java
@@ -99,10 +99,11 @@ public class StandardFlowComparator implements FlowComparator {
private boolean compareComponents(final VersionedComponent componentA, final VersionedComponent componentB, final Set<FlowDifference> differences) {
- return compareComponents(componentA, componentB, differences, true);
+ return compareComponents(componentA, componentB, differences, true, true);
}
- private boolean compareComponents(final VersionedComponent componentA, final VersionedComponent componentB, final Set<FlowDifference> differences, final boolean compareNamePos) {
+ private boolean compareComponents(final VersionedComponent componentA, final VersionedComponent componentB, final Set<FlowDifference> differences, final boolean compareNamePos,
+ final boolean compareComments) {
if (componentA == null) {
differences.add(difference(DifferenceType.COMPONENT_ADDED, componentA, componentB, componentA, componentB));
return true;
@@ -113,11 +114,13 @@ public class StandardFlowComparator implements FlowComparator {
return true;
}
- addIfDifferent(differences, DifferenceType.COMMENTS_CHANGED, componentA, componentB, c -> c.getComments());
+ if (compareComments) {
+ addIfDifferent(differences, DifferenceType.COMMENTS_CHANGED, componentA, componentB, VersionedComponent::getComments);
+ }
if (compareNamePos) {
- addIfDifferent(differences, DifferenceType.NAME_CHANGED, componentA, componentB, c -> c.getName());
- addIfDifferent(differences, DifferenceType.POSITION_CHANGED, componentA, componentB, c -> c.getPosition());
+ addIfDifferent(differences, DifferenceType.NAME_CHANGED, componentA, componentB, VersionedComponent::getName);
+ addIfDifferent(differences, DifferenceType.POSITION_CHANGED, componentA, componentB, VersionedComponent::getPosition);
}
return false;
@@ -128,18 +131,18 @@ public class StandardFlowComparator implements FlowComparator {
return;
}
- addIfDifferent(differences, DifferenceType.ANNOTATION_DATA_CHANGED, processorA, processorB, p -> p.getAnnotationData());
- addIfDifferent(differences, DifferenceType.AUTO_TERMINATED_RELATIONSHIPS_CHANGED, processorA, processorB, p -> p.getAutoTerminatedRelationships());
- addIfDifferent(differences, DifferenceType.BULLETIN_LEVEL_CHANGED, processorA, processorB, p -> p.getBulletinLevel());
- addIfDifferent(differences, DifferenceType.BUNDLE_CHANGED, processorA, processorB, p -> p.getBundle());
- addIfDifferent(differences, DifferenceType.CONCURRENT_TASKS_CHANGED, processorA, processorB, p -> p.getConcurrentlySchedulableTaskCount());
- addIfDifferent(differences, DifferenceType.EXECUTION_MODE_CHANGED, processorA, processorB, p -> p.getExecutionNode());
- addIfDifferent(differences, DifferenceType.PENALTY_DURATION_CHANGED, processorA, processorB, p -> p.getPenaltyDuration());
- addIfDifferent(differences, DifferenceType.RUN_DURATION_CHANGED, processorA, processorB, p -> p.getRunDurationMillis());
- addIfDifferent(differences, DifferenceType.SCHEDULING_PERIOD_CHANGED, processorA, processorB, p -> p.getSchedulingPeriod());
- addIfDifferent(differences, DifferenceType.SCHEDULING_STRATEGY_CHANGED, processorA, processorB, p -> p.getSchedulingStrategy());
- addIfDifferent(differences, DifferenceType.STYLE_CHANGED, processorA, processorB, p -> p.getStyle());
- addIfDifferent(differences, DifferenceType.YIELD_DURATION_CHANGED, processorA, processorB, p -> p.getYieldDuration());
+ addIfDifferent(differences, DifferenceType.ANNOTATION_DATA_CHANGED, processorA, processorB, VersionedProcessor::getAnnotationData);
+ addIfDifferent(differences, DifferenceType.AUTO_TERMINATED_RELATIONSHIPS_CHANGED, processorA, processorB, VersionedProcessor::getAutoTerminatedRelationships);
+ addIfDifferent(differences, DifferenceType.BULLETIN_LEVEL_CHANGED, processorA, processorB, VersionedProcessor::getBulletinLevel);
+ addIfDifferent(differences, DifferenceType.BUNDLE_CHANGED, processorA, processorB, VersionedProcessor::getBundle);
+ addIfDifferent(differences, DifferenceType.CONCURRENT_TASKS_CHANGED, processorA, processorB, VersionedProcessor::getConcurrentlySchedulableTaskCount);
+ addIfDifferent(differences, DifferenceType.EXECUTION_MODE_CHANGED, processorA, processorB, VersionedProcessor::getExecutionNode);
+ addIfDifferent(differences, DifferenceType.PENALTY_DURATION_CHANGED, processorA, processorB, VersionedProcessor::getPenaltyDuration);
+ addIfDifferent(differences, DifferenceType.RUN_DURATION_CHANGED, processorA, processorB, VersionedProcessor::getRunDurationMillis);
+ addIfDifferent(differences, DifferenceType.SCHEDULING_PERIOD_CHANGED, processorA, processorB, VersionedProcessor::getSchedulingPeriod);
+ addIfDifferent(differences, DifferenceType.SCHEDULING_STRATEGY_CHANGED, processorA, processorB, VersionedProcessor::getSchedulingStrategy);
+ addIfDifferent(differences, DifferenceType.STYLE_CHANGED, processorA, processorB, VersionedProcessor::getStyle);
+ addIfDifferent(differences, DifferenceType.YIELD_DURATION_CHANGED, processorA, processorB, VersionedProcessor::getYieldDuration);
compareProperties(processorA, processorB, processorA.getProperties(), processorB.getProperties(), differences);
}
@@ -148,8 +151,8 @@ public class StandardFlowComparator implements FlowComparator {
return;
}
- addIfDifferent(differences, DifferenceType.ANNOTATION_DATA_CHANGED, serviceA, serviceB, s -> s.getAnnotationData());
- addIfDifferent(differences, DifferenceType.BUNDLE_CHANGED, serviceA, serviceB, s -> s.getBundle());
+ addIfDifferent(differences, DifferenceType.ANNOTATION_DATA_CHANGED, serviceA, serviceB, VersionedControllerService::getAnnotationData);
+ addIfDifferent(differences, DifferenceType.BUNDLE_CHANGED, serviceA, serviceB, VersionedControllerService::getBundle);
compareProperties(serviceA, serviceB, serviceA.getProperties(), serviceB.getProperties(), differences);
}
@@ -183,39 +186,6 @@ public class StandardFlowComparator implements FlowComparator {
});
}
- private void compareVariables(final VersionedProcessGroup groupA, final VersionedProcessGroup groupB, final Set<FlowDifference> differences) {
-
- final Map<String, String> variablesA = groupA.getVariables();
- final Map<String, String> variablesB = groupB.getVariables();
-
- if (variablesA != null) {
- variablesA.entrySet().stream()
- .forEach(entry -> {
- final String valueA = entry.getValue();
- final String valueB = variablesB.get(entry.getKey());
-
- if (valueA == null && valueB != null) {
- differences.add(difference(DifferenceType.VARIABLE_ADDED, groupA, groupB, entry.getKey(), entry.getKey()));
- } else if (valueA != null && valueB == null) {
- differences.add(difference(DifferenceType.VARIABLE_REMOVED, groupA, groupB, entry.getKey(), entry.getKey()));
- }
- });
- }
-
- if (variablesB != null) {
- variablesB.entrySet().stream()
- .forEach(entry -> {
- final String valueA = variablesA.get(entry.getKey());
- final String valueB = entry.getValue();
-
- // If there are any properties for component B that do not exist for Component A, add those as differences as well.
- if (valueA == null && valueB != null) {
- differences.add(difference(DifferenceType.VARIABLE_ADDED, groupA, groupB, entry.getKey(), entry.getKey()));
- }
- });
- }
- }
-
private void compare(final VersionedFunnel funnelA, final VersionedFunnel funnelB, final Set<FlowDifference> differences) {
if (compareComponents(funnelA, funnelB, differences)) {
@@ -228,10 +198,10 @@ public class StandardFlowComparator implements FlowComparator {
return;
}
- addIfDifferent(differences, DifferenceType.LABEL_VALUE_CHANGED, labelA, labelB, l -> l.getLabel());
- addIfDifferent(differences, DifferenceType.POSITION_CHANGED, labelA, labelB, l -> l.getHeight());
- addIfDifferent(differences, DifferenceType.POSITION_CHANGED, labelA, labelB, l -> l.getWidth());
- addIfDifferent(differences, DifferenceType.STYLE_CHANGED, labelA, labelB, l -> l.getStyle());
+ addIfDifferent(differences, DifferenceType.LABEL_VALUE_CHANGED, labelA, labelB, VersionedLabel::getLabel);
+ addIfDifferent(differences, DifferenceType.POSITION_CHANGED, labelA, labelB, VersionedLabel::getHeight);
+ addIfDifferent(differences, DifferenceType.POSITION_CHANGED, labelA, labelB, VersionedLabel::getWidth);
+ addIfDifferent(differences, DifferenceType.STYLE_CHANGED, labelA, labelB, VersionedLabel::getStyle);
}
private void compare(final VersionedPort portA, final VersionedPort portB, final Set<FlowDifference> differences) {
@@ -241,17 +211,17 @@ public class StandardFlowComparator implements FlowComparator {
}
private void compare(final VersionedRemoteProcessGroup rpgA, final VersionedRemoteProcessGroup rpgB, final Set<FlowDifference> differences) {
- if (compareComponents(rpgA, rpgB, differences)) {
+ if (compareComponents(rpgA, rpgB, differences, true, false)) { // do not compare comments for RPG because they come from remote system, not our local flow
return;
}
- addIfDifferent(differences, DifferenceType.RPG_COMMS_TIMEOUT_CHANGED, rpgA, rpgB, r -> r.getCommunicationsTimeout());
- addIfDifferent(differences, DifferenceType.RPG_NETWORK_INTERFACE_CHANGED, rpgA, rpgB, r -> rpgA.getLocalNetworkInterface());
- addIfDifferent(differences, DifferenceType.RPG_PROXY_HOST_CHANGED, rpgA, rpgB, r -> rpgA.getProxyHost());
- addIfDifferent(differences, DifferenceType.RPG_PROXY_PORT_CHANGED, rpgA, rpgB, r -> rpgA.getProxyPort());
- addIfDifferent(differences, DifferenceType.RPG_PROXY_USER_CHANGED, rpgA, rpgB, r -> rpgA.getProxyUser());
- addIfDifferent(differences, DifferenceType.RPG_TRANSPORT_PROTOCOL_CHANGED, rpgA, rpgB, r -> rpgA.getTransportProtocol());
- addIfDifferent(differences, DifferenceType.YIELD_DURATION_CHANGED, rpgA, rpgB, r -> rpgA.getYieldDuration());
+ addIfDifferent(differences, DifferenceType.RPG_COMMS_TIMEOUT_CHANGED, rpgA, rpgB, VersionedRemoteProcessGroup::getCommunicationsTimeout);
+ addIfDifferent(differences, DifferenceType.RPG_NETWORK_INTERFACE_CHANGED, rpgA, rpgB, VersionedRemoteProcessGroup::getLocalNetworkInterface);
+ addIfDifferent(differences, DifferenceType.RPG_PROXY_HOST_CHANGED, rpgA, rpgB, VersionedRemoteProcessGroup::getProxyHost);
+ addIfDifferent(differences, DifferenceType.RPG_PROXY_PORT_CHANGED, rpgA, rpgB, VersionedRemoteProcessGroup::getProxyPort);
+ addIfDifferent(differences, DifferenceType.RPG_PROXY_USER_CHANGED, rpgA, rpgB, VersionedRemoteProcessGroup::getProxyUser);
+ addIfDifferent(differences, DifferenceType.RPG_TRANSPORT_PROTOCOL_CHANGED, rpgA, rpgB, VersionedRemoteProcessGroup::getTransportProtocol);
+ addIfDifferent(differences, DifferenceType.YIELD_DURATION_CHANGED, rpgA, rpgB, VersionedRemoteProcessGroup::getYieldDuration);
differences.addAll(compareComponents(rpgA.getInputPorts(), rpgB.getInputPorts(), (a, b, diffs) -> compare(a, b, diffs)));
differences.addAll(compareComponents(rpgA.getOutputPorts(), rpgB.getOutputPorts(), (a, b, diffs) -> compare(a, b, diffs)));
@@ -262,14 +232,14 @@ public class StandardFlowComparator implements FlowComparator {
return;
}
- addIfDifferent(differences, DifferenceType.REMOTE_PORT_BATCH_SIZE_CHANGED, portA, portB, p -> p.getBatchSize());
- addIfDifferent(differences, DifferenceType.REMOTE_PORT_COMPRESSION_CHANGED, portA, portB, p -> p.isUseCompression());
- addIfDifferent(differences, DifferenceType.CONCURRENT_TASKS_CHANGED, portA, portB, p -> p.getConcurrentlySchedulableTaskCount());
+ addIfDifferent(differences, DifferenceType.REMOTE_PORT_BATCH_SIZE_CHANGED, portA, portB, VersionedRemoteGroupPort::getBatchSize);
+ addIfDifferent(differences, DifferenceType.REMOTE_PORT_COMPRESSION_CHANGED, portA, portB, VersionedRemoteGroupPort::isUseCompression);
+ addIfDifferent(differences, DifferenceType.CONCURRENT_TASKS_CHANGED, portA, portB, VersionedRemoteGroupPort::getConcurrentlySchedulableTaskCount);
}
private void compare(final VersionedProcessGroup groupA, final VersionedProcessGroup groupB, final Set<FlowDifference> differences, final boolean compareNamePos) {
- if (compareComponents(groupA, groupB, differences, compareNamePos)) {
+ if (compareComponents(groupA, groupB, differences, compareNamePos, true)) {
return;
}
@@ -283,19 +253,17 @@ public class StandardFlowComparator implements FlowComparator {
return;
}
- addIfDifferent(differences, DifferenceType.VERSIONED_FLOW_COORDINATES_CHANGED, groupA, groupB, g -> g.getVersionedFlowCoordinates());
+ addIfDifferent(differences, DifferenceType.VERSIONED_FLOW_COORDINATES_CHANGED, groupA, groupB, VersionedProcessGroup::getVersionedFlowCoordinates);
- differences.addAll(compareComponents(groupA.getConnections(), groupB.getConnections(), (a, b, diffs) -> compare(a, b, diffs)));
- differences.addAll(compareComponents(groupA.getProcessors(), groupB.getProcessors(), (a, b, diffs) -> compare(a, b, diffs)));
- differences.addAll(compareComponents(groupA.getControllerServices(), groupB.getControllerServices(), (a, b, diffs) -> compare(a, b, diffs)));
- differences.addAll(compareComponents(groupA.getFunnels(), groupB.getFunnels(), (a, b, diffs) -> compare(a, b, diffs)));
- differences.addAll(compareComponents(groupA.getInputPorts(), groupB.getInputPorts(), (a, b, diffs) -> compare(a, b, diffs)));
- differences.addAll(compareComponents(groupA.getLabels(), groupB.getLabels(), (a, b, diffs) -> compare(a, b, diffs)));
- differences.addAll(compareComponents(groupA.getOutputPorts(), groupB.getOutputPorts(), (a, b, diffs) -> compare(a, b, diffs)));
+ differences.addAll(compareComponents(groupA.getConnections(), groupB.getConnections(), this::compare));
+ differences.addAll(compareComponents(groupA.getProcessors(), groupB.getProcessors(), this::compare));
+ differences.addAll(compareComponents(groupA.getControllerServices(), groupB.getControllerServices(), this::compare));
+ differences.addAll(compareComponents(groupA.getFunnels(), groupB.getFunnels(), this::compare));
+ differences.addAll(compareComponents(groupA.getInputPorts(), groupB.getInputPorts(), this::compare));
+ differences.addAll(compareComponents(groupA.getLabels(), groupB.getLabels(), this::compare));
+ differences.addAll(compareComponents(groupA.getOutputPorts(), groupB.getOutputPorts(), this::compare));
differences.addAll(compareComponents(groupA.getProcessGroups(), groupB.getProcessGroups(), (a, b, diffs) -> compare(a, b, diffs, true)));
- differences.addAll(compareComponents(groupA.getRemoteProcessGroups(), groupB.getRemoteProcessGroups(), (a, b, diffs) -> compare(a, b, diffs)));
-
- compareVariables(groupA, groupB, differences);
+ differences.addAll(compareComponents(groupA.getRemoteProcessGroups(), groupB.getRemoteProcessGroups(), this::compare));
}
@@ -304,13 +272,13 @@ public class StandardFlowComparator implements FlowComparator {
return;
}
- addIfDifferent(differences, DifferenceType.BACKPRESSURE_DATA_SIZE_THRESHOLD_CHANGED, connectionA, connectionB, c -> c.getBackPressureDataSizeThreshold());
- addIfDifferent(differences, DifferenceType.BACKPRESSURE_OBJECT_THRESHOLD_CHANGED, connectionA, connectionB, c -> c.getBackPressureObjectThreshold());
- addIfDifferent(differences, DifferenceType.BENDPOINTS_CHANGED, connectionA, connectionB, c -> c.getBends());
- addIfDifferent(differences, DifferenceType.DESTINATION_CHANGED, connectionA, connectionB, c -> c.getDestination());
- addIfDifferent(differences, DifferenceType.FLOWFILE_EXPIRATION_CHANGED, connectionA, connectionB, c -> c.getFlowFileExpiration());
- addIfDifferent(differences, DifferenceType.PRIORITIZERS_CHANGED, connectionA, connectionB, c -> c.getPrioritizers());
- addIfDifferent(differences, DifferenceType.SELECTED_RELATIONSHIPS_CHANGED, connectionA, connectionB, c -> c.getSelectedRelationships());
+ addIfDifferent(differences, DifferenceType.BACKPRESSURE_DATA_SIZE_THRESHOLD_CHANGED, connectionA, connectionB, VersionedConnection::getBackPressureDataSizeThreshold);
+ addIfDifferent(differences, DifferenceType.BACKPRESSURE_OBJECT_THRESHOLD_CHANGED, connectionA, connectionB, VersionedConnection::getBackPressureObjectThreshold);
+ addIfDifferent(differences, DifferenceType.BENDPOINTS_CHANGED, connectionA, connectionB, VersionedConnection::getBends);
+ addIfDifferent(differences, DifferenceType.DESTINATION_CHANGED, connectionA, connectionB, VersionedConnection::getDestination);
+ addIfDifferent(differences, DifferenceType.FLOWFILE_EXPIRATION_CHANGED, connectionA, connectionB, VersionedConnection::getFlowFileExpiration);
+ addIfDifferent(differences, DifferenceType.PRIORITIZERS_CHANGED, connectionA, connectionB, VersionedConnection::getPrioritizers);
+ addIfDifferent(differences, DifferenceType.SELECTED_RELATIONSHIPS_CHANGED, connectionA, connectionB, VersionedConnection::getSelectedRelationships);
addIfDifferent(differences, DifferenceType.SOURCE_CHANGED, connectionA, connectionB, c -> c.getSource().getId());
}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a2f639f3/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StaticDifferenceDescriptor.java
----------------------------------------------------------------------
diff --git a/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StaticDifferenceDescriptor.java b/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StaticDifferenceDescriptor.java
index 42fe5e3..6894923 100644
--- a/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StaticDifferenceDescriptor.java
+++ b/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StaticDifferenceDescriptor.java
@@ -17,7 +17,10 @@
package org.apache.nifi.registry.flow.diff;
+import java.util.Objects;
+
import org.apache.nifi.registry.flow.VersionedComponent;
+import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
/**
* Describes differences between flows as if the flows are two disparate flows that are being
@@ -47,6 +50,32 @@ public class StaticDifferenceDescriptor implements DifferenceDescriptor {
description = String.format("Property '%s' exists for %s with ID %s in %s but not in %s",
valueA, componentA.getComponentType().getTypeName(), componentA.getIdentifier(), flowAName, flowBName);
break;
+ case VARIABLE_ADDED:
+ description = String.format("Variable '%s' exists for Process Group with ID %s in %s but not in %s",
+ valueB, componentB.getIdentifier(), flowBName, flowAName);
+ break;
+ case VARIABLE_REMOVED:
+ description = String.format("Variable '%s' exists for Process Group with ID %s in %s but not in %s",
+ valueA, componentA.getIdentifier(), flowAName, flowBName);
+ break;
+ case VERSIONED_FLOW_COORDINATES_CHANGED:
+ if (valueA instanceof VersionedFlowCoordinates && valueB instanceof VersionedFlowCoordinates) {
+ final VersionedFlowCoordinates coordinatesA = (VersionedFlowCoordinates) valueA;
+ final VersionedFlowCoordinates coordinatesB = (VersionedFlowCoordinates) valueB;
+
+ // If the two vary only by version, then use a more concise message. If anything else is different, then use a fully explanation.
+ if (Objects.equals(coordinatesA.getRegistryUrl(), coordinatesB.getRegistryUrl()) && Objects.equals(coordinatesA.getBucketId(), coordinatesB.getBucketId())
+ && Objects.equals(coordinatesA.getFlowId(), coordinatesB.getFlowId()) && coordinatesA.getVersion() != coordinatesB.getVersion()) {
+
+ description = String.format("Flow Version is %s in %s but %s in %s", coordinatesA.getVersion(), flowAName, coordinatesB.getVersion(), flowBName);
+ break;
+ }
+ }
+
+ description = String.format("%s for %s with ID %s; flow '%s' has value %s; flow '%s' has value %s",
+ type.getDescription(), componentA.getComponentType().getTypeName(), componentA.getIdentifier(),
+ flowAName, valueA, flowBName, valueB);
+ break;
default:
description = String.format("%s for %s with ID %s; flow '%s' has value %s; flow '%s' has value %s",
type.getDescription(), componentA.getComponentType().getTypeName(), componentA.getIdentifier(),