You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2017/07/06 17:44:40 UTC
nifi git commit: NIFI-4151: Updated UpdateAttribute to only create
JAXB Context once;
Minor performance tweaks to standard validators and StatusMerge.prettyPrint;
updated AbstractConfiguredComponent to not create a new ValidationContext each
time that va
Repository: nifi
Updated Branches:
refs/heads/master cff81c0cd -> ba56774fa
NIFI-4151: Updated UpdateAttribute to only create JAXB Context once; Minor performance tweaks to standard validators and StatusMerge.prettyPrint; updated AbstractConfiguredComponent to not create a new ValidationContext each time that validate is called but only when needed; updated FlowController, StandardControllerServiceProvider, and StandardProcessGroup so that component lookups can be performed using a ConcurrentMap at FlowController level instead of having to perform a depth-first search through all ProcessGroups when calling findProcessor(), findProcessGroup(), findXYZ()
This closes #1979
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ba56774f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ba56774f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ba56774f
Branch: refs/heads/master
Commit: ba56774fa1c16d935f592df482e74bfa1564b5a3
Parents: cff81c0
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Jul 5 11:24:01 2017 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Thu Jul 6 13:43:56 2017 -0400
----------------------------------------------------------------------
.../nifi/processor/util/StandardValidators.java | 13 +-
.../nifi/cluster/manager/StatusMerger.java | 28 +++-
.../controller/AbstractConfiguredComponent.java | 34 ++++-
.../apache/nifi/controller/FlowController.java | 92 ++++++++++++-
.../nifi/controller/StandardProcessorNode.java | 8 +-
.../service/StandardControllerServiceNode.java | 1 +
.../StandardControllerServiceProvider.java | 30 +++--
.../nifi/groups/StandardProcessGroup.java | 135 ++++++++++---------
.../TestStandardProcessScheduler.java | 63 ++++++---
.../TestStandardControllerServiceProvider.java | 47 +++++--
.../service/mock/MockProcessGroup.java | 22 ++-
.../update/attributes/serde/CriteriaSerDe.java | 15 ++-
12 files changed, 352 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/ba56774f/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
index d0170b8..a596330 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
@@ -391,6 +391,8 @@ public class StandardValidators {
}
public static final Validator TIME_PERIOD_VALIDATOR = new Validator() {
+ private final Pattern TIME_DURATION_PATTERN = Pattern.compile(FormatUtils.TIME_DURATION_REGEX);
+
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
@@ -400,7 +402,7 @@ public class StandardValidators {
if (input == null) {
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Time Period cannot be null").build();
}
- if (Pattern.compile(FormatUtils.TIME_DURATION_REGEX).matcher(input.toLowerCase()).matches()) {
+ if (TIME_DURATION_PATTERN.matcher(input.toLowerCase()).matches()) {
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
} else {
return new ValidationResult.Builder()
@@ -416,6 +418,8 @@ public class StandardValidators {
};
public static final Validator DATA_SIZE_VALIDATOR = new Validator() {
+ private final Pattern DATA_SIZE_PATTERN = Pattern.compile(DataUnit.DATA_SIZE_REGEX);
+
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
@@ -430,7 +434,7 @@ public class StandardValidators {
.explanation("Data Size cannot be null")
.build();
}
- if (Pattern.compile(DataUnit.DATA_SIZE_REGEX).matcher(input.toUpperCase()).matches()) {
+ if (DATA_SIZE_PATTERN.matcher(input.toUpperCase()).matches()) {
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
} else {
return new ValidationResult.Builder()
@@ -706,8 +710,7 @@ public class StandardValidators {
//
//
static class TimePeriodValidator implements Validator {
-
- private final Pattern pattern;
+ private static final Pattern pattern = Pattern.compile(FormatUtils.TIME_DURATION_REGEX);
private final long minNanos;
private final long maxNanos;
@@ -716,8 +719,6 @@ public class StandardValidators {
private final String maxValueEnglish;
public TimePeriodValidator(final long minValue, final TimeUnit minTimeUnit, final long maxValue, final TimeUnit maxTimeUnit) {
- pattern = Pattern.compile(FormatUtils.TIME_DURATION_REGEX);
-
this.minNanos = TimeUnit.NANOSECONDS.convert(minValue, minTimeUnit);
this.maxNanos = TimeUnit.NANOSECONDS.convert(maxValue, maxTimeUnit);
this.minValueEnglish = minValue + " " + minTimeUnit.toString();
http://git-wip-us.apache.org/repos/asf/nifi/blob/ba56774f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
index a876d51..9cceaf7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
@@ -61,6 +61,12 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
public class StatusMerger {
+ private static final String ZERO_COUNT = "0";
+ private static final String ZERO_BYTES = "0 bytes";
+ private static final String ZERO_COUNT_AND_BYTES = "0 (0 bytes)";
+ private static final String EMPTY_COUNT = "-";
+ private static final String EMPTY_BYTES = "-";
+
public static void merge(final ControllerStatusDTO target, final ControllerStatusDTO toMerge) {
if (target == null || toMerge == null) {
return;
@@ -743,14 +749,32 @@ public class StatusMerger {
}
public static String formatCount(final Integer intStatus) {
- return intStatus == null ? "-" : FormatUtils.formatCount(intStatus);
+ if (intStatus == null) {
+ return EMPTY_COUNT;
+ }
+ if (intStatus == 0) {
+ return ZERO_COUNT;
+ }
+
+ return FormatUtils.formatCount(intStatus);
}
public static String formatDataSize(final Long longStatus) {
- return longStatus == null ? "-" : FormatUtils.formatDataSize(longStatus);
+ if (longStatus == null) {
+ return EMPTY_BYTES;
+ }
+ if (longStatus == 0L) {
+ return ZERO_BYTES;
+ }
+
+ return FormatUtils.formatDataSize(longStatus);
}
public static String prettyPrint(final Integer count, final Long bytes) {
+ if (count != null && bytes != null && count == 0 && bytes == 0L) {
+ return ZERO_COUNT_AND_BYTES;
+ }
+
return formatCount(count) + " (" + formatDataSize(bytes) + ")";
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ba56774f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
index 76c5d7a..2aa84f8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
@@ -61,6 +61,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
private final ControllerServiceProvider serviceProvider;
private final AtomicReference<String> name;
private final AtomicReference<String> annotationData = new AtomicReference<>();
+ private final AtomicReference<ValidationContext> validationContext = new AtomicReference<>();
private final String componentType;
private final String componentCanonicalClass;
private final VariableRegistry variableRegistry;
@@ -118,6 +119,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
@Override
public void setAnnotationData(final String data) {
+ invalidateValidationContext();
annotationData.set(CharacterFilterUtils.filterInvalidXmlCharacters(data));
}
@@ -435,6 +437,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+ invalidateValidationContext();
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), getComponent().getIdentifier())) {
getComponent().onPropertyModified(descriptor, oldValue, newValue);
}
@@ -449,8 +452,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
@Override
public boolean isValid() {
- final Collection<ValidationResult> validationResults = validate(validationContextFactory.newValidationContext(
- getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier()));
+ final Collection<ValidationResult> validationResults = validate(getValidationContext());
for (final ValidationResult result : validationResults) {
if (!result.isValid()) {
@@ -470,8 +472,13 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
final List<ValidationResult> results = new ArrayList<>();
lock.lock();
try {
- final ValidationContext validationContext = validationContextFactory.newValidationContext(
- serviceIdentifiersNotToValidate, getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier());
+ final ValidationContext validationContext;
+ if (serviceIdentifiersNotToValidate == null || serviceIdentifiersNotToValidate.isEmpty()) {
+ validationContext = getValidationContext();
+ } else {
+ validationContext = getValidationContextFactory().newValidationContext(serviceIdentifiersNotToValidate,
+ getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier());
+ }
final Collection<ValidationResult> validationResults;
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), getComponent().getIdentifier())) {
@@ -515,6 +522,25 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
return this.validationContextFactory;
}
+ protected void invalidateValidationContext() {
+ this.validationContext.set(null);
+ }
+
+ protected ValidationContext getValidationContext() {
+ while (true) {
+ ValidationContext context = this.validationContext.get();
+ if (context != null) {
+ return context;
+ }
+
+ context = getValidationContextFactory().newValidationContext(getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier());
+ final boolean updated = validationContext.compareAndSet(null, context);
+ if (updated) {
+ return context;
+ }
+ }
+ }
+
protected VariableRegistry getVariableRegistry() {
return this.variableRegistry;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ba56774f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index aef6d46..9438946 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -18,7 +18,6 @@ package org.apache.nifi.controller;
import static java.util.Objects.requireNonNull;
-import com.sun.jersey.api.client.ClientHandlerException;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -47,7 +46,9 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
+
import javax.net.ssl.SSLContext;
+
import org.apache.commons.collections4.Predicate;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action;
@@ -248,6 +249,8 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.sun.jersey.api.client.ClientHandlerException;
+
public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider,
QueueProvider, Authorizable, ProvenanceAuthorizableFactory, NodeTypeProvider, IdentifierLookup, ReloadComponent {
@@ -301,6 +304,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private final VariableRegistry variableRegistry;
private final ConcurrentMap<String, ControllerServiceNode> rootControllerServices = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, ProcessorNode> allProcessors = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, ProcessGroup> allProcessGroups = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, Connection> allConnections = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, Port> allInputPorts = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, Port> allOutputPorts = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, Funnel> allFunnels = new ConcurrentHashMap<>();
+
private volatile ZooKeeperStateServer zooKeeperStateServer;
// The Heartbeat Bean is used to provide an Atomic Reference to data that is used in heartbeats that may
@@ -532,7 +542,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final ProcessGroup rootGroup = new StandardProcessGroup(ComponentIdGenerator.generateId().toString(), this, processScheduler,
nifiProperties, encryptor, this, this.variableRegistry);
rootGroup.setName(DEFAULT_ROOT_GROUP_NAME);
- rootGroupRef.set(rootGroup);
+ setRootGroup(rootGroup);
instanceId = ComponentIdGenerator.generateId().toString();
controllerServiceProvider = new StandardControllerServiceProvider(this, processScheduler, bulletinRepository, stateManagerProvider, this.variableRegistry, this.nifiProperties);
@@ -1660,6 +1670,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// update the heartbeat bean
this.heartbeatBeanRef.set(new HeartbeatBean(group, isPrimary()));
+ allProcessGroups.put(group.getIdentifier(), group);
+ allProcessGroups.put(ROOT_GROUP_ID_ALIAS, group);
} finally {
writeLock.unlock();
}
@@ -2364,12 +2376,76 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* @return the process group or null if not group is found
*/
public ProcessGroup getGroup(final String id) {
- requireNonNull(id);
- final ProcessGroup root = getRootGroup();
- final String searchId = id.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id;
- return root == null ? null : root.findProcessGroup(searchId);
+ return allProcessGroups.get(requireNonNull(id));
+ }
+
+ public void onProcessGroupAdded(final ProcessGroup group) {
+ allProcessGroups.put(group.getIdentifier(), group);
+ }
+
+ public void onProcessGroupRemoved(final ProcessGroup group) {
+ allProcessGroups.remove(group.getIdentifier());
+ }
+
+ public void onProcessorAdded(final ProcessorNode procNode) {
+ allProcessors.put(procNode.getIdentifier(), procNode);
+ }
+
+ public void onProcessorRemoved(final ProcessorNode procNode) {
+ allProcessors.remove(procNode.getIdentifier());
+ }
+
+ public ProcessorNode getProcessorNode(final String id) {
+ return allProcessors.get(id);
+ }
+
+ public void onConnectionAdded(final Connection connection) {
+ allConnections.put(connection.getIdentifier(), connection);
+ }
+
+ public void onConnectionRemoved(final Connection connection) {
+ allConnections.remove(connection.getIdentifier());
}
+ public Connection getConnection(final String id) {
+ return allConnections.get(id);
+ }
+
+ public void onInputPortAdded(final Port inputPort) {
+ allInputPorts.put(inputPort.getIdentifier(), inputPort);
+ }
+
+ public void onInputPortRemoved(final Port inputPort) {
+ allInputPorts.remove(inputPort.getIdentifier());
+ }
+
+ public Port getInputPort(final String id) {
+ return allInputPorts.get(id);
+ }
+
+ public void onOutputPortAdded(final Port outputPort) {
+ allOutputPorts.put(outputPort.getIdentifier(), outputPort);
+ }
+
+ public void onOutputPortRemoved(final Port outputPort) {
+ allOutputPorts.remove(outputPort.getIdentifier());
+ }
+
+ public Port getOutputPort(final String id) {
+ return allOutputPorts.get(id);
+ }
+
+ public void onFunnelAdded(final Funnel funnel) {
+ allFunnels.put(funnel.getIdentifier(), funnel);
+ }
+
+ public void onFunnelRemoved(final Funnel funnel) {
+ allFunnels.remove(funnel.getIdentifier());
+ }
+
+ public Funnel getFunnel(final String id) {
+ return allFunnels.get(id);
+ }
/**
* Returns the status of all components in the controller. This request is
* not in the context of a user so the results will be unfiltered.
@@ -3487,7 +3563,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
public int getActiveThreadCount() {
- return getGroupStatus(getRootGroupId()).getActiveThreadCount();
+ final int timerDrivenCount = timerDrivenEngineRef.get().getActiveCount();
+ final int eventDrivenCount = eventDrivenEngineRef.get().getActiveCount();
+ return timerDrivenCount + eventDrivenCount;
}
private RepositoryStatusReport getProcessorStats() {
http://git-wip-us.apache.org/repos/asf/nifi/blob/ba56774f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 6d96a5c..1ff09d7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -964,9 +964,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
@Override
public boolean isValid() {
try {
- final ValidationContext validationContext = this.getValidationContextFactory()
- .newValidationContext(getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier());
-
+ final ValidationContext validationContext = getValidationContext();
final Collection<ValidationResult> validationResults = super.validate(validationContext);
for (final ValidationResult result : validationResults) {
@@ -1011,8 +1009,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
// Processors may go invalid while RUNNING, but only validating while STOPPED is a trade-off
// we are willing to make in order to save on validation costs that would be unnecessary most of the time.
if (getScheduledState() == ScheduledState.STOPPED) {
- final ValidationContext validationContext = this.getValidationContextFactory()
- .newValidationContext(getProperties(), getAnnotationData(), getProcessGroup().getIdentifier(), getIdentifier());
+ final ValidationContext validationContext = getValidationContext();
final Collection<ValidationResult> validationResults = super.validate(validationContext);
@@ -1111,6 +1108,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
@Override
public synchronized void setProcessGroup(final ProcessGroup group) {
this.processGroup.set(group);
+ invalidateValidationContext();
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/ba56774f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index e399e7f..fa4ab84 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -208,6 +208,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
writeLock.lock();
try {
this.processGroup = group;
+ invalidateValidationContext();
} finally {
writeLock.unlock();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ba56774f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 2d006cc..4011113 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -30,6 +30,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -79,6 +81,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
private final FlowController flowController;
private final NiFiProperties nifiProperties;
+ private final ConcurrentMap<String, ControllerServiceNode> serviceCache = new ConcurrentHashMap<>();
+
public StandardControllerServiceProvider(final FlowController flowController, final ProcessScheduler scheduler, final BulletinRepository bulletinRepo,
final StateManagerProvider stateManagerProvider, final VariableRegistry variableRegistry, final NiFiProperties nifiProperties) {
@@ -158,6 +162,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
}
+ serviceCache.put(id, serviceNode);
+
return serviceNode;
} catch (final Throwable t) {
throw new ControllerServiceInstantiationException(t);
@@ -222,6 +228,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedLoggableComponent, proxiedLoggableComponent, invocationHandler, id,
new StandardValidationContextFactory(this, variableRegistry), this, componentType, type, variableRegistry, flowController, true);
+
+ serviceCache.putIfAbsent(id, serviceNode);
return serviceNode;
}
@@ -459,12 +467,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
@Override
public ControllerService getControllerServiceForComponent(final String serviceIdentifier, final String componentId) {
- final ProcessGroup rootGroup = getRootGroup();
-
// Find the Process Group that owns the component.
ProcessGroup groupOfInterest = null;
- final ProcessorNode procNode = rootGroup.findProcessor(componentId);
+ final ProcessorNode procNode = flowController.getProcessorNode(componentId);
if (procNode == null) {
final ControllerServiceNode serviceNode = getControllerServiceNode(componentId);
if (serviceNode == null) {
@@ -523,7 +529,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
return rootServiceNode;
}
- return getRootGroup().findControllerService(serviceIdentifier);
+ return serviceCache.get(serviceIdentifier);
}
@Override
@@ -533,15 +539,16 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
serviceNodes = flowController.getRootControllerServices();
} else {
ProcessGroup group = getRootGroup();
- if (!FlowController.ROOT_GROUP_ID_ALIAS.equals(groupId) && !group.getIdentifier().equals(groupId)) {
+ if (FlowController.ROOT_GROUP_ID_ALIAS.equals(groupId) || group.getIdentifier().equals(groupId)) {
+ serviceNodes = new HashSet<>(serviceCache.values());
+ } else {
group = group.findProcessGroup(groupId);
- }
+ if (group == null) {
+ return Collections.emptySet();
+ }
- if (group == null) {
- return Collections.emptySet();
+ serviceNodes = group.getControllerServices(true);
}
-
- serviceNodes = group.getControllerServices(true);
}
final Set<String> identifiers = new HashSet<>();
@@ -570,13 +577,14 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
group.removeControllerService(serviceNode);
ExtensionManager.removeInstanceClassLoader(serviceNode.getIdentifier());
+ serviceCache.remove(serviceNode.getIdentifier());
}
@Override
public Set<ControllerServiceNode> getAllControllerServices() {
final Set<ControllerServiceNode> allServices = new HashSet<>();
allServices.addAll(flowController.getRootControllerServices());
- allServices.addAll(getRootGroup().findAllControllerServices());
+ allServices.addAll(serviceCache.values());
return allServices;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ba56774f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 452f3cc..2907704 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -405,6 +405,7 @@ public final class StandardProcessGroup implements ProcessGroup {
port.setProcessGroup(this);
inputPorts.put(requireNonNull(port).getIdentifier(), port);
+ flowController.onInputPortAdded(port);
} finally {
writeLock.unlock();
}
@@ -439,6 +440,7 @@ public final class StandardProcessGroup implements ProcessGroup {
throw new IllegalStateException(port.getIdentifier() + " is not an Input Port of this Process Group");
}
+ flowController.onInputPortRemoved(port);
LOG.info("Input Port {} removed from flow", port);
} finally {
writeLock.unlock();
@@ -484,6 +486,7 @@ public final class StandardProcessGroup implements ProcessGroup {
port.setProcessGroup(this);
outputPorts.put(port.getIdentifier(), port);
+ flowController.onOutputPortAdded(port);
} finally {
writeLock.unlock();
}
@@ -509,6 +512,7 @@ public final class StandardProcessGroup implements ProcessGroup {
throw new IllegalStateException(port.getIdentifier() + " is not an Output Port of this Process Group");
}
+ flowController.onOutputPortRemoved(port);
LOG.info("Output Port {} removed from flow", port);
} finally {
writeLock.unlock();
@@ -545,6 +549,7 @@ public final class StandardProcessGroup implements ProcessGroup {
try {
group.setParent(this);
processGroups.put(Objects.requireNonNull(group).getIdentifier(), group);
+ flowController.onProcessGroupAdded(group);
} finally {
writeLock.unlock();
}
@@ -584,6 +589,7 @@ public final class StandardProcessGroup implements ProcessGroup {
removeComponents(group);
processGroups.remove(group.getIdentifier());
+ flowController.onProcessGroupRemoved(group);
LOG.info("{} removed from flow", group);
} finally {
writeLock.unlock();
@@ -704,6 +710,7 @@ public final class StandardProcessGroup implements ProcessGroup {
processor.setProcessGroup(this);
processors.put(processorId, processor);
+ flowController.onProcessorAdded(processor);
} finally {
writeLock.unlock();
}
@@ -745,6 +752,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
processors.remove(id);
+ flowController.onProcessorRemoved(processor);
LogRepositoryFactory.getRepository(processor.getIdentifier()).removeAllObservers();
final StateManagerProvider stateManagerProvider = flowController.getStateManagerProvider();
@@ -884,6 +892,7 @@ public final class StandardProcessGroup implements ProcessGroup {
destination.addConnection(connection);
}
connections.put(connection.getIdentifier(), connection);
+ flowController.onConnectionAdded(connection);
} finally {
writeLock.unlock();
}
@@ -943,6 +952,7 @@ public final class StandardProcessGroup implements ProcessGroup {
// remove the connection from our map
connections.remove(connection.getIdentifier());
LOG.info("{} removed from flow", connection);
+ flowController.onConnectionRemoved(connection);
} finally {
writeLock.unlock();
}
@@ -970,25 +980,21 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public Connection findConnection(final String id) {
- return findConnection(id, this);
- }
-
- private Connection findConnection(final String id, final ProcessGroup start) {
- Connection connection = start.getConnection(id);
- if (connection != null) {
- return connection;
+ final Connection connection = flowController.getConnection(id);
+ if (connection == null) {
+ return null;
}
- for (final ProcessGroup group : start.getProcessGroups()) {
- connection = findConnection(id, group);
- if (connection != null) {
- return connection;
- }
+ // We found a Connection in the Controller, but we only want to return it if
+ // the Process Group is this or is a child of this.
+ if (isOwner(connection.getProcessGroup())) {
+ return connection;
}
return null;
}
+
@Override
public List<Connection> findAllConnections() {
return findAllConnections(this);
@@ -1386,19 +1392,19 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public ProcessGroup findProcessGroup(final String id) {
- return findProcessGroup(requireNonNull(id), this);
- }
+ if (requireNonNull(id).equals(getIdentifier())) {
+ return this;
+ }
- private ProcessGroup findProcessGroup(final String id, final ProcessGroup start) {
- if (id.equals(start.getIdentifier())) {
- return start;
+ final ProcessGroup group = flowController.getGroup(id);
+ if (group == null) {
+ return null;
}
- for (final ProcessGroup group : start.getProcessGroups()) {
- final ProcessGroup matching = findProcessGroup(id, group);
- if (matching != null) {
- return matching;
- }
+ // We found a Processor in the Controller, but we only want to return it if
+ // the Process Group is this or is a child of this.
+ if (isOwner(group.getParent())) {
+ return group;
}
return null;
@@ -1453,23 +1459,30 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public ProcessorNode findProcessor(final String id) {
- return findProcessor(id, this);
- }
+ final ProcessorNode node = flowController.getProcessorNode(id);
+ if (node == null) {
+ return null;
+ }
- private ProcessorNode findProcessor(final String id, final ProcessGroup start) {
- ProcessorNode node = start.getProcessor(id);
- if (node != null) {
+ // We found a Processor in the Controller, but we only want to return it if
+ // the Process Group is this or is a child of this.
+ if (isOwner(node.getProcessGroup())) {
return node;
}
- for (final ProcessGroup group : start.getProcessGroups()) {
- node = findProcessor(id, group);
- if (node != null) {
- return node;
- }
+ return null;
+ }
+
+ private boolean isOwner(ProcessGroup owner) {
+ while (owner != this && owner != null) {
+ owner = owner.getParent();
}
- return null;
+ if (owner == this) {
+ return true;
+ }
+
+ return false;
}
@Override
@@ -1521,6 +1534,7 @@ public final class StandardProcessGroup implements ProcessGroup {
return null;
}
+ @Override
public RemoteGroupPort findRemoteGroupPort(final String identifier) {
return findRemoteGroupPort(identifier, this);
}
@@ -1584,7 +1598,16 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public Port findInputPort(final String id) {
- return findPort(id, this, new InputPortRetriever());
+ final Port port = flowController.getInputPort(id);
+ if (port == null) {
+ return null;
+ }
+
+ if (isOwner(port.getProcessGroup())) {
+ return port;
+ }
+
+ return null;
}
@Override
@@ -1602,7 +1625,16 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public Port findOutputPort(final String id) {
- return findPort(id, this, new OutputPortRetriever());
+ final Port port = flowController.getOutputPort(id);
+ if (port == null) {
+ return null;
+ }
+
+ if (isOwner(port.getProcessGroup())) {
+ return port;
+ }
+
+ return null;
}
@Override
@@ -1674,21 +1706,6 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
- private Port findPort(final String id, final ProcessGroup group, final PortRetriever retriever) {
- Port port = retriever.getPort(group, id);
- if (port != null) {
- return port;
- }
-
- for (final ProcessGroup childGroup : group.getProcessGroups()) {
- port = findPort(id, childGroup, retriever);
- if (port != null) {
- return port;
- }
- }
-
- return null;
- }
private Port getPortByName(final String name, final ProcessGroup group, final PortRetriever retriever) {
for (final Port port : retriever.getPorts(group)) {
@@ -1716,6 +1733,7 @@ public final class StandardProcessGroup implements ProcessGroup {
funnel.setProcessGroup(this);
funnels.put(funnel.getIdentifier(), funnel);
+ flowController.onFunnelAdded(funnel);
if (autoStart) {
startFunnel(funnel);
@@ -1737,25 +1755,19 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public Funnel findFunnel(final String id) {
- return findFunnel(id, this);
- }
-
- private Funnel findFunnel(final String id, final ProcessGroup start) {
- Funnel funnel = start.getFunnel(id);
- if (funnel != null) {
+ final Funnel funnel = flowController.getFunnel(id);
+ if (funnel == null) {
return funnel;
}
- for (final ProcessGroup group : start.getProcessGroups()) {
- funnel = findFunnel(id, group);
- if (funnel != null) {
- return funnel;
- }
+ if (isOwner(funnel.getProcessGroup())) {
+ return funnel;
}
return null;
}
+
@Override
public ControllerServiceNode findControllerService(final String id) {
return findControllerService(id, this);
@@ -1814,6 +1826,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
funnels.remove(funnel.getIdentifier());
+ flowController.onFunnelRemoved(funnel);
LOG.info("{} removed from flow", funnel);
} finally {
writeLock.unlock();
http://git-wip-us.apache.org/repos/asf/nifi/blob/ba56774f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
index b024e52..dfd627f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
@@ -16,7 +16,27 @@
*/
package org.apache.nifi.controller.scheduling;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.io.FileUtils;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@@ -61,29 +81,13 @@ import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.NiFiProperties;
+import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.io.FileUtils;
-import org.junit.After;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
public class TestStandardProcessScheduler {
@@ -121,7 +125,26 @@ public class TestStandardProcessScheduler {
taskNode = new StandardReportingTaskNode(loggableComponent, UUID.randomUUID().toString(), null, scheduler, validationContextFactory, variableRegistry, reloadComponent);
controller = Mockito.mock(FlowController.class);
- rootGroup = new MockProcessGroup();
+
+ final ConcurrentMap<String, ProcessorNode> processorMap = new ConcurrentHashMap<>();
+ Mockito.doAnswer(new Answer<ProcessorNode>() {
+ @Override
+ public ProcessorNode answer(InvocationOnMock invocation) throws Throwable {
+ final String id = invocation.getArgumentAt(0, String.class);
+ return processorMap.get(id);
+ }
+ }).when(controller).getProcessorNode(Mockito.anyString());
+
+ Mockito.doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ final ProcessorNode procNode = invocation.getArgumentAt(0, ProcessorNode.class);
+ processorMap.putIfAbsent(procNode.getIdentifier(), procNode);
+ return null;
+ }
+ }).when(controller).onProcessorAdded(Mockito.any(ProcessorNode.class));
+
+ rootGroup = new MockProcessGroup(controller);
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(rootGroup);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ba56774f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
index 3a28cb0..e82085e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
@@ -42,9 +42,12 @@ import org.apache.nifi.processor.StandardValidationContextFactory;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.util.NiFiProperties;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import java.beans.PropertyDescriptor;
import java.util.Arrays;
@@ -55,6 +58,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -88,6 +93,7 @@ public class TestStandardControllerServiceProvider {
private static VariableRegistry variableRegistry = VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY;
private static NiFiProperties niFiProperties;
private static Bundle systemBundle;
+ private FlowController controller;
@BeforeClass
public static void setNiFiProps() {
@@ -99,6 +105,29 @@ public class TestStandardControllerServiceProvider {
ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet());
}
+ @Before
+ public void setup() {
+ controller = Mockito.mock(FlowController.class);
+
+ final ConcurrentMap<String, ProcessorNode> processorMap = new ConcurrentHashMap<>();
+ Mockito.doAnswer(new Answer<ProcessorNode>() {
+ @Override
+ public ProcessorNode answer(InvocationOnMock invocation) throws Throwable {
+ final String id = invocation.getArgumentAt(0, String.class);
+ return processorMap.get(id);
+ }
+ }).when(controller).getProcessorNode(Mockito.anyString());
+
+ Mockito.doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ final ProcessorNode procNode = invocation.getArgumentAt(0, ProcessorNode.class);
+ processorMap.putIfAbsent(procNode.getIdentifier(), procNode);
+ return null;
+ }
+ }).when(controller).onProcessorAdded(Mockito.any(ProcessorNode.class));
+ }
+
private StandardProcessScheduler createScheduler() {
return new StandardProcessScheduler(null, null, stateManagerProvider, variableRegistry, niFiProperties);
}
@@ -111,7 +140,7 @@ public class TestStandardControllerServiceProvider {
@Test
public void testDisableControllerService() {
- final ProcessGroup procGroup = new MockProcessGroup();
+ final ProcessGroup procGroup = new MockProcessGroup(controller);
final FlowController controller = Mockito.mock(FlowController.class);
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
@@ -127,7 +156,7 @@ public class TestStandardControllerServiceProvider {
@Test(timeout = 10000)
public void testEnableDisableWithReference() {
- final ProcessGroup group = new MockProcessGroup();
+ final ProcessGroup group = new MockProcessGroup(controller);
final FlowController controller = Mockito.mock(FlowController.class);
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(group);
@@ -190,7 +219,7 @@ public class TestStandardControllerServiceProvider {
}
public void testEnableReferencingServicesGraph(final ProcessScheduler scheduler) {
- final ProcessGroup procGroup = new MockProcessGroup();
+ final ProcessGroup procGroup = new MockProcessGroup(controller);
final FlowController controller = Mockito.mock(FlowController.class);
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
@@ -246,7 +275,7 @@ public class TestStandardControllerServiceProvider {
@Test
public void testOrderingOfServices() {
- final ProcessGroup procGroup = new MockProcessGroup();
+ final ProcessGroup procGroup = new MockProcessGroup(controller);
final FlowController controller = Mockito.mock(FlowController.class);
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
@@ -405,7 +434,7 @@ public class TestStandardControllerServiceProvider {
new StandardValidationContextFactory(serviceProvider, null), scheduler, serviceProvider, niFiProperties,
VariableRegistry.EMPTY_REGISTRY, reloadComponent);
- final ProcessGroup group = new StandardProcessGroup(UUID.randomUUID().toString(), serviceProvider, scheduler, null, null, null, variableRegistry);
+ final ProcessGroup group = new StandardProcessGroup(UUID.randomUUID().toString(), serviceProvider, scheduler, null, null, Mockito.mock(FlowController.class), variableRegistry);
group.addProcessor(procNode);
procNode.setProcessGroup(group);
@@ -414,7 +443,7 @@ public class TestStandardControllerServiceProvider {
@Test
public void testEnableReferencingComponents() {
- final ProcessGroup procGroup = new MockProcessGroup();
+ final ProcessGroup procGroup = new MockProcessGroup(controller);
final FlowController controller = Mockito.mock(FlowController.class);
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
@@ -442,7 +471,7 @@ public class TestStandardControllerServiceProvider {
FlowController controller = Mockito.mock(FlowController.class);
StandardControllerServiceProvider provider =
new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider, variableRegistry, niFiProperties);
- ProcessGroup procGroup = new MockProcessGroup();
+ ProcessGroup procGroup = new MockProcessGroup(controller);
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
ControllerServiceNode A = provider.createControllerService(ServiceA.class.getName(), "A",
@@ -493,7 +522,7 @@ public class TestStandardControllerServiceProvider {
FlowController controller = Mockito.mock(FlowController.class);
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null,
stateManagerProvider, variableRegistry, niFiProperties);
- ProcessGroup procGroup = new MockProcessGroup();
+ ProcessGroup procGroup = new MockProcessGroup(controller);
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
ControllerServiceNode A = provider.createControllerService(ServiceC.class.getName(), "A",
@@ -535,7 +564,7 @@ public class TestStandardControllerServiceProvider {
FlowController controller = Mockito.mock(FlowController.class);
StandardControllerServiceProvider provider =
new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider, variableRegistry, niFiProperties);
- ProcessGroup procGroup = new MockProcessGroup();
+ ProcessGroup procGroup = new MockProcessGroup(controller);
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1",
http://git-wip-us.apache.org/repos/asf/nifi/blob/ba56774f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
index 838c53c..b6f70a1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
@@ -17,6 +17,13 @@
package org.apache.nifi.controller.service.mock;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.connectable.Connectable;
@@ -25,6 +32,7 @@ import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.connectable.Positionable;
+import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.Snippet;
import org.apache.nifi.controller.Template;
@@ -35,16 +43,14 @@ import org.apache.nifi.groups.ProcessGroupCounts;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.remote.RemoteGroupPort;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
public class MockProcessGroup implements ProcessGroup {
private final Map<String, ControllerServiceNode> serviceMap = new HashMap<>();
private final Map<String, ProcessorNode> processorMap = new HashMap<>();
+ private final FlowController flowController;
+
+ public MockProcessGroup(final FlowController flowController) {
+ this.flowController = flowController;
+ }
@Override
public Authorizable getParentAuthorizable() {
@@ -260,11 +266,13 @@ public class MockProcessGroup implements ProcessGroup {
public void addProcessor(final ProcessorNode processor) {
processor.setProcessGroup(this);
processorMap.put(processor.getIdentifier(), processor);
+ flowController.onProcessorAdded(processor);
}
@Override
public void removeProcessor(final ProcessorNode processor) {
processorMap.remove(processor.getIdentifier());
+ flowController.onProcessorRemoved(processor);
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/ba56774f/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-model/src/main/java/org/apache/nifi/update/attributes/serde/CriteriaSerDe.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-model/src/main/java/org/apache/nifi/update/attributes/serde/CriteriaSerDe.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-model/src/main/java/org/apache/nifi/update/attributes/serde/CriteriaSerDe.java
index f1cd126..0ad19ce 100644
--- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-model/src/main/java/org/apache/nifi/update/attributes/serde/CriteriaSerDe.java
+++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-model/src/main/java/org/apache/nifi/update/attributes/serde/CriteriaSerDe.java
@@ -35,6 +35,15 @@ import org.apache.nifi.update.attributes.Rule;
*
*/
public class CriteriaSerDe {
+ private static final JAXBContext JAXB_CONTEXT;
+
+ static {
+ try {
+ JAXB_CONTEXT = JAXBContext.newInstance(CriteriaBinding.class);
+ } catch (JAXBException e) {
+ throw new RuntimeException("Could not create JAXB Context for UpdateAttribute", e);
+ }
+ }
/**
* Handles the Criteria binding during the (de)serialization process. This
@@ -86,8 +95,7 @@ public class CriteriaSerDe {
binding.setRules(criteria.getRules());
// serialize the binding
- final JAXBContext context = JAXBContext.newInstance(CriteriaBinding.class);
- final Marshaller marshaller = context.createMarshaller();
+ final Marshaller marshaller = JAXB_CONTEXT.createMarshaller();
marshaller.setProperty(Marshaller.JAXB_FRAGMENT, true);
marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
marshaller.marshal(binding, writer);
@@ -110,8 +118,7 @@ public class CriteriaSerDe {
if (string != null && !string.trim().equals("")) {
try {
// deserialize the binding
- final JAXBContext context = JAXBContext.newInstance(CriteriaBinding.class);
- final Unmarshaller unmarshaller = context.createUnmarshaller();
+ final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
final Source source = new StreamSource(new StringReader(string));
final JAXBElement<CriteriaBinding> element = unmarshaller.unmarshal(source, CriteriaBinding.class);