You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/22 18:03:52 UTC
[03/79] [abbrv] [partial] incubator-nifi git commit: NIFI-270 Made
all changes identified by adam, mark, joey to prep for a cleaner build
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
deleted file mode 100644
index 071be4d..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ /dev/null
@@ -1,1243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import org.apache.nifi.controller.ProcessScheduler;
-import org.apache.nifi.controller.ValidationContextFactory;
-import org.apache.nifi.controller.ProcessorNode;
-import static java.util.Objects.requireNonNull;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.connectable.Connectable;
-import org.apache.nifi.connectable.ConnectableType;
-import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.connectable.Position;
-import org.apache.nifi.controller.service.ControllerServiceProvider;
-import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.logging.LogLevel;
-import org.apache.nifi.logging.LogRepositoryFactory;
-import org.apache.nifi.nar.NarCloseable;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.annotation.CapabilityDescription;
-import org.apache.nifi.processor.annotation.EventDriven;
-import org.apache.nifi.processor.annotation.SideEffectFree;
-import org.apache.nifi.processor.annotation.SupportsBatching;
-import org.apache.nifi.processor.annotation.TriggerSerially;
-import org.apache.nifi.processor.annotation.TriggerWhenAnyDestinationAvailable;
-import org.apache.nifi.processor.annotation.TriggerWhenEmpty;
-import org.apache.nifi.scheduling.SchedulingStrategy;
-import org.apache.nifi.util.FormatUtils;
-
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.quartz.CronExpression;
-import org.slf4j.LoggerFactory;
-
-/**
- * ProcessorNode provides thread-safe access to a FlowFileProcessor as it exists
- * within a controlled flow. This node keeps track of the processor, its
- * scheduling information and its relationships to other processors and whatever
- * scheduled futures exist for it. Must be thread safe.
- *
- * @author none
- */
-public class StandardProcessorNode extends ProcessorNode implements Connectable {
-
- public static final String BULLETIN_OBSERVER_ID = "bulletin-observer";
-
- public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
- public static final String DEFAULT_YIELD_PERIOD = "1 sec";
- public static final String DEFAULT_PENALIZATION_PERIOD = "30 sec";
- private final AtomicReference<ProcessGroup> processGroup;
- private final Processor processor;
- private final AtomicReference<String> identifier;
- private final Map<Connection, Connectable> destinations;
- private final Map<Relationship, Set<Connection>> connections;
- private final AtomicReference<Set<Relationship>> undefinedRelationshipsToTerminate;
- private final AtomicReference<List<Connection>> incomingConnectionsRef;
- private final ReentrantReadWriteLock rwLock;
- private final Lock readLock;
- private final Lock writeLock;
- private final AtomicBoolean isolated;
- private final AtomicBoolean lossTolerant;
- private final AtomicReference<ScheduledState> scheduledState;
- private final AtomicReference<String> comments;
- private final AtomicReference<String> name;
- private final AtomicReference<Position> position;
- private final AtomicReference<String> annotationData;
- private final AtomicReference<String> schedulingPeriod; // stored as string so it's presented to user as they entered it
- private final AtomicReference<String> yieldPeriod;
- private final AtomicReference<String> penalizationPeriod;
- private final AtomicReference<Map<String, String>> style;
- private final AtomicInteger concurrentTaskCount;
- private final AtomicLong yieldExpiration;
- private final AtomicLong schedulingNanos;
- private final boolean triggerWhenEmpty;
- private final boolean sideEffectFree;
- private final boolean triggeredSerially;
- private final boolean triggerWhenAnyDestinationAvailable;
- private final boolean eventDrivenSupported;
- private final boolean batchSupported;
- private final ValidationContextFactory validationContextFactory;
- private final ProcessScheduler processScheduler;
- private long runNanos = 0L;
-
- private SchedulingStrategy schedulingStrategy; // guarded by read/write lock
-
- StandardProcessorNode(final Processor processor, final String uuid, final ValidationContextFactory validationContextFactory,
- final ProcessScheduler scheduler, final ControllerServiceProvider controllerServiceProvider) {
- super(processor, uuid, validationContextFactory, controllerServiceProvider);
-
- this.processor = processor;
- identifier = new AtomicReference<>(uuid);
- destinations = new HashMap<>();
- connections = new HashMap<>();
- incomingConnectionsRef = new AtomicReference<List<Connection>>(new ArrayList<Connection>());
- scheduledState = new AtomicReference<>(ScheduledState.STOPPED);
- rwLock = new ReentrantReadWriteLock(false);
- readLock = rwLock.readLock();
- writeLock = rwLock.writeLock();
- lossTolerant = new AtomicBoolean(false);
- final Set<Relationship> emptySetOfRelationships = new HashSet<>();
- undefinedRelationshipsToTerminate = new AtomicReference<>(emptySetOfRelationships);
- comments = new AtomicReference<>("");
- name = new AtomicReference<>(processor.getClass().getSimpleName());
- schedulingPeriod = new AtomicReference<>("0 sec");
- schedulingNanos = new AtomicLong(MINIMUM_SCHEDULING_NANOS);
- yieldPeriod = new AtomicReference<>(DEFAULT_YIELD_PERIOD);
- yieldExpiration = new AtomicLong(0L);
- concurrentTaskCount = new AtomicInteger(1);
- position = new AtomicReference<>(new Position(0D, 0D));
- style = new AtomicReference<>(Collections.unmodifiableMap(new HashMap<String, String>()));
- this.processGroup = new AtomicReference<>();
- processScheduler = scheduler;
- annotationData = new AtomicReference<>();
- isolated = new AtomicBoolean(false);
- penalizationPeriod = new AtomicReference<>(DEFAULT_PENALIZATION_PERIOD);
-
- triggerWhenEmpty = processor.getClass().isAnnotationPresent(TriggerWhenEmpty.class);
- sideEffectFree = processor.getClass().isAnnotationPresent(SideEffectFree.class);
- batchSupported = processor.getClass().isAnnotationPresent(SupportsBatching.class);
- triggeredSerially = processor.getClass().isAnnotationPresent(TriggerSerially.class);
- triggerWhenAnyDestinationAvailable = processor.getClass().isAnnotationPresent(TriggerWhenAnyDestinationAvailable.class);
- this.validationContextFactory = validationContextFactory;
- eventDrivenSupported = processor.getClass().isAnnotationPresent(EventDriven.class) && !triggeredSerially && !triggerWhenEmpty;
- schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN;
- }
-
- /**
- * @return comments about this specific processor instance
- */
- @Override
- public String getComments() {
- return comments.get();
- }
-
- /**
- * Provides and opportunity to retain information about this particular
- * processor instance
- *
- * @param comments
- */
- @Override
- public void setComments(final String comments) {
- writeLock.lock();
- try {
- if (isRunning()) {
- throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
- }
- this.comments.set(comments);
- } finally {
- writeLock.unlock();
- }
- }
-
- @Override
- public ScheduledState getScheduledState() {
- return scheduledState.get();
- }
-
- @Override
- public Position getPosition() {
- return position.get();
- }
-
- @Override
- public void setPosition(Position position) {
- this.position.set(position);
- }
-
- public Map<String, String> getStyle() {
- return style.get();
- }
-
- public void setStyle(final Map<String, String> style) {
- if (style != null) {
- this.style.set(Collections.unmodifiableMap(new HashMap<>(style)));
- }
- }
-
- @Override
- public String getIdentifier() {
- return identifier.get();
- }
-
- /**
- * @return if true flow file content generated by this processor is
- * considered loss tolerant
- */
- @Override
- public boolean isLossTolerant() {
- return lossTolerant.get();
- }
-
- /**
- * @return if true processor runs only on the primary node
- */
- public boolean isIsolated() {
- return isolated.get();
- }
-
- /**
- * @return true if the processor has the {@link TriggerWhenEmpty}
- * annotation, false otherwise.
- */
- @Override
- public boolean isTriggerWhenEmpty() {
- return triggerWhenEmpty;
- }
-
- /**
- * @return true if the processor has the {@link SideEffectFree} annotation,
- * false otherwise.
- */
- public boolean isSideEffectFree() {
- return sideEffectFree;
- }
-
- @Override
- public boolean isHighThroughputSupported() {
- return batchSupported;
- }
-
- /**
- * @return true if the processor has the
- * {@link TriggerWhenAnyDestinationAvailable} annotation, false otherwise.
- */
- public boolean isTriggerWhenAnyDestinationAvailable() {
- return triggerWhenAnyDestinationAvailable;
- }
-
- /**
- * Indicates whether flow file content made by this processor must be
- * persisted
- *
- * @param lossTolerant
- */
- @Override
- public void setLossTolerant(final boolean lossTolerant) {
- writeLock.lock();
- try {
- if (isRunning()) {
- throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
- }
- this.lossTolerant.set(lossTolerant);
- } finally {
- writeLock.unlock();
- }
- }
-
- /**
- * Indicates whether the processor runs on only the primary node.
- *
- * @param isolated
- */
- public void setIsolated(final boolean isolated) {
- writeLock.lock();
- try {
- if (isRunning()) {
- throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
- }
- this.isolated.set(isolated);
- } finally {
- writeLock.unlock();
- }
- }
-
- @Override
- public boolean isAutoTerminated(final Relationship relationship) {
- final Set<Relationship> terminatable = undefinedRelationshipsToTerminate.get();
- if (terminatable == null) {
- return false;
- }
- return terminatable.contains(relationship);
- }
-
- /**
- * Indicates whether flow files transferred to undefined relationships
- * should be terminated
- *
- * @param terminate
- */
- @Override
- public void setAutoTerminatedRelationships(final Set<Relationship> terminate) {
- writeLock.lock();
- try {
- if (isRunning()) {
- throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
- }
-
- for (final Relationship rel : terminate) {
- if (!getConnections(rel).isEmpty()) {
- throw new IllegalStateException("Cannot mark relationship '" + rel.getName() + "' as auto-terminated because Connection already exists with this relationship");
- }
- }
- undefinedRelationshipsToTerminate.set(new HashSet<>(terminate));
- } finally {
- writeLock.unlock();
- }
- }
-
- /**
- * @return an unmodifiable Set that contains all of the
- * ProcessorRelationship objects that are configured to be auto-terminated
- */
- public Set<Relationship> getAutoTerminatedRelationships() {
- Set<Relationship> relationships = undefinedRelationshipsToTerminate.get();
- if (relationships == null) {
- relationships = new HashSet<>();
- }
- return Collections.unmodifiableSet(relationships);
- }
-
- @Override
- public String getName() {
- return name.get();
- }
-
- /**
- * @return the value of the processor's {@link CapabilityDescription}
- * annotation, if one exists, else <code>null</code>.
- */
- public String getProcessorDescription() {
- final CapabilityDescription capDesc = processor.getClass().getAnnotation(CapabilityDescription.class);
- return (capDesc == null) ? null : capDesc.value();
- }
-
- @Override
- public void setName(final String name) {
- writeLock.lock();
- try {
- if (isRunning()) {
- throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
- }
- this.name.set(name);
- } finally {
- writeLock.unlock();
- }
- }
-
- /**
- * @param timeUnit determines the unit of time to represent the scheduling
- * period. If null will be reported in units of
- * {@link #DEFAULT_SCHEDULING_TIME_UNIT}
- * @return the schedule period that should elapse before subsequent cycles
- * of this processor's tasks
- */
- @Override
- public long getSchedulingPeriod(final TimeUnit timeUnit) {
- return timeUnit.convert(schedulingNanos.get(), TimeUnit.NANOSECONDS);
- }
-
- public boolean isEventDrivenSupported() {
- readLock.lock();
- try {
- return this.eventDrivenSupported;
- } finally {
- readLock.unlock();
- }
- }
-
- /**
- * Updates the Scheduling Strategy used for this Processor
- *
- * @param schedulingStrategy
- *
- * @throws IllegalArgumentException if the SchedulingStrategy is not not
- * applicable for this Processor
- */
- public void setSchedulingStrategy(final SchedulingStrategy schedulingStrategy) {
- writeLock.lock();
- try {
- if (schedulingStrategy == SchedulingStrategy.EVENT_DRIVEN && !eventDrivenSupported) {
- // not valid. Just ignore it. We don't throw an Exception because if a developer changes a Processor so that
- // it no longer supports EventDriven mode, we don't want the app to fail to startup if it was already in Event-Driven
- // Mode. Instead, we will simply leave it in Timer-Driven mode
- return;
- }
-
- this.schedulingStrategy = schedulingStrategy;
- setIsolated(schedulingStrategy == SchedulingStrategy.PRIMARY_NODE_ONLY);
- } finally {
- writeLock.unlock();
- }
- }
-
- /**
- * Returns the currently configured scheduling strategy
- *
- * @return
- */
- public SchedulingStrategy getSchedulingStrategy() {
- readLock.lock();
- try {
- return this.schedulingStrategy;
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public String getSchedulingPeriod() {
- return schedulingPeriod.get();
- }
-
- /**
- * @param value the number of <code>timeUnit</code>s between scheduling
- * intervals.
- * @param timeUnit determines the unit of time to represent the scheduling
- * period.
- */
- @Override
- public void setScheduldingPeriod(final String schedulingPeriod) {
- writeLock.lock();
- try {
- if (isRunning()) {
- throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
- }
-
- switch (schedulingStrategy) {
- case CRON_DRIVEN: {
- try {
- new CronExpression(schedulingPeriod);
- } catch (final Exception e) {
- throw new IllegalArgumentException("Scheduling Period is not a valid cron expression: " + schedulingPeriod);
- }
- }
- break;
- case PRIMARY_NODE_ONLY:
- case TIMER_DRIVEN: {
- final long schedulingNanos = FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod), TimeUnit.NANOSECONDS);
- if (schedulingNanos < 0) {
- throw new IllegalArgumentException("Scheduling Period must be positive");
- }
- this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS, schedulingNanos));
- }
- break;
- case EVENT_DRIVEN:
- default:
- return;
- }
-
- this.schedulingPeriod.set(schedulingPeriod);
- } finally {
- writeLock.unlock();
- }
- }
-
- @Override
- public long getRunDuration(final TimeUnit timeUnit) {
- readLock.lock();
- try {
- return timeUnit.convert(this.runNanos, TimeUnit.NANOSECONDS);
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public void setRunDuration(final long duration, final TimeUnit timeUnit) {
- writeLock.lock();
- try {
- if (duration < 0) {
- throw new IllegalArgumentException("Run Duration must be non-negative value; cannot set to " + timeUnit.toSeconds(duration) + " seconds");
- }
-
- this.runNanos = timeUnit.toNanos(duration);
- } finally {
- writeLock.unlock();
- }
- }
-
- /**
- * @param timeUnit determines the unit of time to represent the yield
- * period. If null will be reported in units of
- * {@link #DEFAULT_SCHEDULING_TIME_UNIT}.
- * @return
- */
- @Override
- public long getYieldPeriod(final TimeUnit timeUnit) {
- return FormatUtils.getTimeDuration(getYieldPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit);
- }
-
- public String getYieldPeriod() {
- return yieldPeriod.get();
- }
-
- /**
- * Updates the amount of time that this processor should avoid being
- * scheduled when the processor calls
- * {@link nifi.processor.ProcessContext#yield() ProcessContext.yield()}
- *
- * @param yieldPeriod
- */
- @Override
- public void setYieldPeriod(final String yieldPeriod) {
- writeLock.lock();
- try {
- if (isRunning()) {
- throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
- }
- final long yieldMillis = FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.MILLISECONDS);
- if (yieldMillis < 0) {
- throw new IllegalArgumentException("Yield duration must be positive");
- }
- this.yieldPeriod.set(yieldPeriod);
- } finally {
- writeLock.unlock();
- }
- }
-
- /**
- * Causes the processor not to be scheduled for some period of time. This
- * duration can be obtained and set via the
- * {@link #getYieldPeriod(TimeUnit)} and
- * {@link #setYieldPeriod(long, TimeUnit)} methods.
- */
- @Override
- public void yield() {
- final long yieldMillis = getYieldPeriod(TimeUnit.MILLISECONDS);
- yield(yieldMillis, TimeUnit.MILLISECONDS);
-
- final String yieldDuration = (yieldMillis > 1000) ? (yieldMillis / 1000) + " seconds" : yieldMillis + " milliseconds";
- LoggerFactory.getLogger(processor.getClass()).debug("{} has chosen to yield its resources; will not be scheduled to run again for {}", processor, yieldDuration);
- }
-
- public void yield(final long period, final TimeUnit timeUnit) {
- final long yieldMillis = TimeUnit.MILLISECONDS.convert(period, timeUnit);
- yieldExpiration.set(Math.max(yieldExpiration.get(), System.currentTimeMillis() + yieldMillis));
-
- processScheduler.yield(this);
- }
-
- /**
- * @return the number of milliseconds since Epoch at which time this
- * processor is to once again be scheduled.
- */
- @Override
- public long getYieldExpiration() {
- return yieldExpiration.get();
- }
-
- @Override
- public long getPenalizationPeriod(final TimeUnit timeUnit) {
- return FormatUtils.getTimeDuration(getPenalizationPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit);
- }
-
- @Override
- public String getPenalizationPeriod() {
- return penalizationPeriod.get();
- }
-
- @Override
- public void setPenalizationPeriod(final String penalizationPeriod) {
- writeLock.lock();
- try {
- if (isRunning()) {
- throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
- }
- final long penalizationMillis = FormatUtils.getTimeDuration(requireNonNull(penalizationPeriod), TimeUnit.MILLISECONDS);
- if (penalizationMillis < 0) {
- throw new IllegalArgumentException("Penalization duration must be positive");
- }
- this.penalizationPeriod.set(penalizationPeriod);
- } finally {
- writeLock.unlock();
- }
- }
-
- /**
- * Determines the number of concurrent tasks that may be running for this
- * processor.
- *
- * @param taskCount a number of concurrent tasks this processor may have
- * running
- * @throws IllegalArgumentException if the given value is less than 1
- */
- @Override
- public void setMaxConcurrentTasks(final int taskCount) {
- writeLock.lock();
- try {
- if (isRunning()) {
- throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
- }
- if (taskCount < 1 && getSchedulingStrategy() != SchedulingStrategy.EVENT_DRIVEN) {
- throw new IllegalArgumentException();
- }
- if (!triggeredSerially) {
- concurrentTaskCount.set(taskCount);
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- public boolean isTriggeredSerially() {
- return triggeredSerially;
- }
-
- /**
- * @return the number of tasks that may execute concurrently for this
- * processor
- */
- @Override
- public int getMaxConcurrentTasks() {
- return concurrentTaskCount.get();
- }
-
- public LogLevel getBulletinLevel() {
- return LogRepositoryFactory.getRepository(getIdentifier()).getObservationLevel(BULLETIN_OBSERVER_ID);
- }
-
- public void setBulletinLevel(final LogLevel level) {
- LogRepositoryFactory.getRepository(getIdentifier()).setObservationLevel(BULLETIN_OBSERVER_ID, level);
- }
-
- @Override
- public Set<Connection> getConnections() {
- final Set<Connection> allConnections = new HashSet<>();
- readLock.lock();
- try {
- for (final Set<Connection> connectionSet : connections.values()) {
- allConnections.addAll(connectionSet);
- }
- } finally {
- readLock.unlock();
- }
-
- return allConnections;
- }
-
- @Override
- public List<Connection> getIncomingConnections() {
- return incomingConnectionsRef.get();
- }
-
- @Override
- public Set<Connection> getConnections(final Relationship relationship) {
- final Set<Connection> applicableConnections;
- readLock.lock();
- try {
- applicableConnections = connections.get(relationship);
- } finally {
- readLock.unlock();
- }
- return (applicableConnections == null) ? Collections.<Connection>emptySet() : Collections.unmodifiableSet(applicableConnections);
- }
-
- @Override
- public void addConnection(final Connection connection) {
- Objects.requireNonNull(connection, "connection cannot be null");
-
- if (!connection.getSource().equals(this) && !connection.getDestination().equals(this)) {
- throw new IllegalStateException("Cannot a connection to a ProcessorNode for which the ProcessorNode is neither the Source nor the Destination");
- }
-
- writeLock.lock();
- try {
- List<Connection> updatedIncoming = null;
- if (connection.getDestination().equals(this)) {
- // don't add the connection twice. This may occur if we have a self-loop because we will be told
- // to add the connection once because we are the source and again because we are the destination.
- final List<Connection> incomingConnections = incomingConnectionsRef.get();
- updatedIncoming = new ArrayList<>(incomingConnections);
- if (!updatedIncoming.contains(connection)) {
- updatedIncoming.add(connection);
- }
- }
-
- if (connection.getSource().equals(this)) {
- // don't add the connection twice. This may occur if we have a self-loop because we will be told
- // to add the connection once because we are the source and again because we are the destination.
- if (!destinations.containsKey(connection)) {
- for (final Relationship relationship : connection.getRelationships()) {
- final Relationship rel = getRelationship(relationship.getName());
- Set<Connection> set = connections.get(rel);
- if (set == null) {
- set = new HashSet<>();
- connections.put(rel, set);
- }
-
- set.add(connection);
-
- destinations.put(connection, connection.getDestination());
- }
-
- final Set<Relationship> autoTerminated = this.undefinedRelationshipsToTerminate.get();
- if (autoTerminated != null) {
- autoTerminated.removeAll(connection.getRelationships());
- this.undefinedRelationshipsToTerminate.set(autoTerminated);
- }
- }
- }
-
- if (updatedIncoming != null) {
- incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming));
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- @Override
- public boolean hasIncomingConnection() {
- return !incomingConnectionsRef.get().isEmpty();
- }
-
- @Override
- public void updateConnection(final Connection connection) throws IllegalStateException {
- if (requireNonNull(connection).getSource().equals(this)) {
- writeLock.lock();
- try {
- //
- // update any relationships
- //
- // first check if any relations were removed.
- final List<Relationship> existingRelationships = new ArrayList<>();
- for (final Map.Entry<Relationship, Set<Connection>> entry : connections.entrySet()) {
- if (entry.getValue().contains(connection)) {
- existingRelationships.add(entry.getKey());
- }
- }
-
- for (final Relationship rel : connection.getRelationships()) {
- if (!existingRelationships.contains(rel)) {
- // relationship was removed. Check if this is legal.
- final Set<Connection> connectionsForRelationship = getConnections(rel);
- if (connectionsForRelationship != null && connectionsForRelationship.size() == 1 && this.isRunning() && !isAutoTerminated(rel) && getRelationships().contains(rel)) {
- // if we are running and we do not terminate undefined relationships and this is the only
- // connection that defines the given relationship, and that relationship is required,
- // then it is not legal to remove this relationship from this connection.
- throw new IllegalStateException("Cannot remove relationship " + rel.getName() + " from Connection because doing so would invalidate Processor " + this + ", which is currently running");
- }
- }
- }
-
- // remove the connection from any list that currently contains
- for (final Set<Connection> list : connections.values()) {
- list.remove(connection);
- }
-
- // add the connection in for all relationships listed.
- for (final Relationship rel : connection.getRelationships()) {
- Set<Connection> set = connections.get(rel);
- if (set == null) {
- set = new HashSet<>();
- connections.put(rel, set);
- }
- set.add(connection);
- }
-
- // update to the new destination
- destinations.put(connection, connection.getDestination());
-
- final Set<Relationship> autoTerminated = this.undefinedRelationshipsToTerminate.get();
- if (autoTerminated != null) {
- autoTerminated.removeAll(connection.getRelationships());
- this.undefinedRelationshipsToTerminate.set(autoTerminated);
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- if (connection.getDestination().equals(this)) {
- writeLock.lock();
- try {
- // update our incoming connections -- we can just remove & re-add the connection to
- // update the list.
- final List<Connection> incomingConnections = incomingConnectionsRef.get();
- final List<Connection> updatedIncoming = new ArrayList<>(incomingConnections);
- updatedIncoming.remove(connection);
- updatedIncoming.add(connection);
- incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming));
- } finally {
- writeLock.unlock();
- }
- }
- }
-
- @Override
- public void removeConnection(final Connection connection) {
- boolean connectionRemoved = false;
-
- if (requireNonNull(connection).getSource().equals(this)) {
- for (final Relationship relationship : connection.getRelationships()) {
- final Set<Connection> connectionsForRelationship = getConnections(relationship);
- if ((connectionsForRelationship == null || connectionsForRelationship.size() <= 1) && isRunning()) {
- throw new IllegalStateException("This connection cannot be removed because its source is running and removing it will invalidate this processor");
- }
- }
-
- writeLock.lock();
- try {
- for (final Set<Connection> connectionList : this.connections.values()) {
- connectionList.remove(connection);
- }
-
- connectionRemoved = (destinations.remove(connection) != null);
- } finally {
- writeLock.unlock();
- }
- }
-
- if (connection.getDestination().equals(this)) {
- writeLock.lock();
- try {
- final List<Connection> incomingConnections = incomingConnectionsRef.get();
- if (incomingConnections.contains(connection)) {
- final List<Connection> updatedIncoming = new ArrayList<>(incomingConnections);
- updatedIncoming.remove(connection);
- incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming));
- return;
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- if (!connectionRemoved) {
- throw new IllegalArgumentException("Cannot remove a connection from a ProcessorNode for which the ProcessorNode is not the Source");
- }
- }
-
- /**
- * Gets the relationship for this nodes processor for the given name or
- * creates a new relationship for the given name.
- *
- * @param relationshipName
- * @return
- */
- @Override
- public Relationship getRelationship(final String relationshipName) {
- final Relationship specRel = new Relationship.Builder().name(relationshipName).build();
- Relationship returnRel = specRel;
-
- final Set<Relationship> relationships;
- try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
- relationships = processor.getRelationships();
- }
-
- for (final Relationship rel : relationships) {
- if (rel.equals(specRel)) {
- returnRel = rel;
- break;
- }
- }
- return returnRel;
- }
-
- public Processor getProcessor() {
- return this.processor;
- }
-
- /**
- * Obtains the Set of destination processors for all relationships excluding
- * any destinations that are this processor itself (self-loops)
- *
- * @return
- */
- public Set<Connectable> getDestinations() {
- final Set<Connectable> nonSelfDestinations = new HashSet<>();
- readLock.lock();
- try {
- for (final Connectable connectable : destinations.values()) {
- if (connectable != this) {
- nonSelfDestinations.add(connectable);
- }
- }
- } finally {
- readLock.unlock();
- }
- return nonSelfDestinations;
- }
-
- public Set<Connectable> getDestinations(final Relationship relationship) {
- readLock.lock();
- try {
- final Set<Connectable> destinationSet = new HashSet<>();
- final Set<Connection> relationshipConnections = connections.get(relationship);
- if (relationshipConnections != null) {
- for (final Connection connection : relationshipConnections) {
- destinationSet.add(destinations.get(connection));
- }
- }
- return destinationSet;
- } finally {
- readLock.unlock();
- }
- }
-
- public Set<Relationship> getUndefinedRelationships() {
- final Set<Relationship> undefined = new HashSet<>();
- readLock.lock();
- try {
- final Set<Relationship> relationships;
- try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
- relationships = processor.getRelationships();
- }
-
- if (relationships == null) {
- return undefined;
- }
- for (final Relationship relation : relationships) {
- final Set<Connection> connectionSet = this.connections.get(relation);
- if (connectionSet == null || connectionSet.isEmpty()) {
- undefined.add(relation);
- }
- }
- } finally {
- readLock.unlock();
- }
- return undefined;
- }
-
- /**
- * Determines if the given node is a destination for this node
- *
- * @param node
- * @return true if is a direct destination node; false otherwise
- */
- boolean isRelated(final ProcessorNode node) {
- readLock.lock();
- try {
- return this.destinations.containsValue(node);
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public boolean isRunning() {
- readLock.lock();
- try {
- return getScheduledState().equals(ScheduledState.RUNNING) || processScheduler.getActiveThreadCount(this) > 0;
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public boolean isValid() {
- readLock.lock();
- try {
- final ValidationContext validationContext = validationContextFactory.newValidationContext(getProperties(), getAnnotationData());
-
- final Collection<ValidationResult> validationResults;
- try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
- validationResults = getProcessor().validate(validationContext);
- }
-
- for (final ValidationResult result : validationResults) {
- if (!result.isValid()) {
- return false;
- }
- }
-
- for (final Relationship undef : getUndefinedRelationships()) {
- if (!isAutoTerminated(undef)) {
- return false;
- }
- }
- } catch (final Throwable t) {
- return false;
- } finally {
- readLock.unlock();
- }
-
- return true;
- }
-
- @Override
- public Collection<ValidationResult> getValidationErrors() {
- final List<ValidationResult> results = new ArrayList<>();
- readLock.lock();
- try {
- final ValidationContext validationContext = validationContextFactory.newValidationContext(getProperties(), getAnnotationData());
-
- final Collection<ValidationResult> validationResults;
- try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
- validationResults = getProcessor().validate(validationContext);
- }
-
- for (final ValidationResult result : validationResults) {
- if (!result.isValid()) {
- results.add(result);
- }
- }
-
- for (final Relationship relationship : getUndefinedRelationships()) {
- if (!isAutoTerminated(relationship)) {
- final ValidationResult error = new ValidationResult.Builder()
- .explanation("Relationship '" + relationship.getName() + "' is not connected to any component and is not auto-terminated")
- .subject("Relationship " + relationship.getName())
- .valid(false)
- .build();
- results.add(error);
- }
- }
- } catch (final Throwable t) {
- results.add(new ValidationResult.Builder().explanation("Failed to run validation due to " + t.toString()).valid(false).build());
- } finally {
- readLock.unlock();
- }
- return results;
- }
-
- /**
- * Establishes node equality (based on the processor's identifier)
- *
- * @param other
- * @return
- */
- @Override
- public boolean equals(final Object other) {
- if (!(other instanceof ProcessorNode)) {
- return false;
- }
- final ProcessorNode on = (ProcessorNode) other;
- return new EqualsBuilder().append(identifier.get(), on.getIdentifier()).isEquals();
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder(7, 67).append(identifier).toHashCode();
- }
-
- @Override
- public Collection<Relationship> getRelationships() {
- try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
- return getProcessor().getRelationships();
- }
- }
-
- @Override
- public String toString() {
- try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
- return getProcessor().toString();
- }
- }
-
- @Override
- public ProcessGroup getProcessGroup() {
- return processGroup.get();
- }
-
- @Override
- public void setProcessGroup(final ProcessGroup group) {
- writeLock.lock();
- try {
- this.processGroup.set(group);
- } finally {
- writeLock.unlock();
- }
- }
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
- try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
- processor.onTrigger(context, sessionFactory);
- }
- }
-
- @Override
- public ConnectableType getConnectableType() {
- return ConnectableType.PROCESSOR;
- }
-
- public void setScheduledState(final ScheduledState scheduledState) {
- this.scheduledState.set(scheduledState);
- if (!scheduledState.equals(ScheduledState.RUNNING)) { // if user stops processor, clear yield expiration
- yieldExpiration.set(0L);
- }
- }
-
- @Override
- public void setAnnotationData(final String data) {
- writeLock.lock();
- try {
- if (isRunning()) {
- throw new IllegalStateException("Cannot set AnnotationData while processor is running");
- }
-
- this.annotationData.set(data);
- } finally {
- writeLock.unlock();
- }
- }
-
- @Override
- public String getAnnotationData() {
- return annotationData.get();
- }
-
- @Override
- public Collection<ValidationResult> validate(final ValidationContext validationContext) {
- return processor.validate(validationContext);
- }
-
- @Override
- public void verifyCanDelete() throws IllegalStateException {
- verifyCanDelete(false);
- }
-
- @Override
- public void verifyCanDelete(final boolean ignoreConnections) {
- readLock.lock();
- try {
- if (isRunning()) {
- throw new IllegalStateException(this + " is running");
- }
-
- if (!ignoreConnections) {
- for (final Set<Connection> connectionSet : connections.values()) {
- for (final Connection connection : connectionSet) {
- connection.verifyCanDelete();
- }
- }
-
- for (final Connection connection : incomingConnectionsRef.get()) {
- if (connection.getSource().equals(this)) {
- connection.verifyCanDelete();
- } else {
- throw new IllegalStateException(this + " is the destination of another component");
- }
- }
- }
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public void verifyCanStart() {
- readLock.lock();
- try {
- if (scheduledState.get() != ScheduledState.STOPPED) {
- throw new IllegalStateException(this + " is not stopped");
- }
- verifyNoActiveThreads();
-
- if (!isValid()) {
- throw new IllegalStateException(this + " is not in a valid state");
- }
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public void verifyCanStop() {
- if (getScheduledState() != ScheduledState.RUNNING) {
- throw new IllegalStateException(this + " is not scheduled to run");
- }
- }
-
- @Override
- public void verifyCanUpdate() {
- readLock.lock();
- try {
- if (isRunning()) {
- throw new IllegalStateException(this + " is not stopped");
- }
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public void verifyCanEnable() {
- readLock.lock();
- try {
- if (getScheduledState() != ScheduledState.DISABLED) {
- throw new IllegalStateException(this + " is not disabled");
- }
-
- verifyNoActiveThreads();
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public void verifyCanDisable() {
- readLock.lock();
- try {
- if (getScheduledState() != ScheduledState.STOPPED) {
- throw new IllegalStateException(this + " is not stopped");
- }
- verifyNoActiveThreads();
- } finally {
- readLock.unlock();
- }
- }
-
- private void verifyNoActiveThreads() throws IllegalStateException {
- final int threadCount = processScheduler.getActiveThreadCount(this);
- if (threadCount > 0) {
- throw new IllegalStateException(this + " has " + threadCount + " threads still active");
- }
- }
-
- @Override
- public void verifyModifiable() throws IllegalStateException {
- if (isRunning()) {
- throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardSnippet.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardSnippet.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardSnippet.java
deleted file mode 100644
index 2c9d85e..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardSnippet.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-import javax.xml.bind.annotation.XmlRootElement;
-
-/**
- * Represents a data flow snippet.
- */
-@XmlRootElement(name = "snippet")
-public class StandardSnippet implements Snippet {
-
- private String id;
- private String parentGroupId;
- private Boolean linked;
-
- private Set<String> processGroups = new HashSet<>();
- private Set<String> remoteProcessGroups = new HashSet<>();
- private Set<String> processors = new HashSet<>();
- private Set<String> inputPorts = new HashSet<>();
- private Set<String> outputPorts = new HashSet<>();
- private Set<String> connections = new HashSet<>();
- private Set<String> labels = new HashSet<>();
- private Set<String> funnels = new HashSet<>();
-
- /**
- * The id of this snippet.
- *
- * @return
- */
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- /**
- * Whether or not this snippet is linked to the data flow.
- *
- * @return
- */
- public boolean isLinked() {
- if (linked == null) {
- return false;
- } else {
- return linked;
- }
- }
-
- public void setLinked(Boolean linked) {
- this.linked = linked;
- }
-
- /**
- * The parent group id of the components in this snippet.
- *
- * @return
- */
- public String getParentGroupId() {
- return parentGroupId;
- }
-
- public void setParentGroupId(String parentGroupId) {
- this.parentGroupId = parentGroupId;
- }
-
- /**
- * The connections in this snippet.
- *
- * @return
- */
- public Set<String> getConnections() {
- return Collections.unmodifiableSet(connections);
- }
-
- public void addConnections(Collection<String> ids) {
- connections.addAll(ids);
- }
-
- /**
- * The funnels in this snippet.
- *
- * @return
- */
- public Set<String> getFunnels() {
- return Collections.unmodifiableSet(funnels);
- }
-
- public void addFunnels(Collection<String> ids) {
- funnels.addAll(ids);
- }
-
- /**
- * The input ports in this snippet.
- *
- * @return
- */
- public Set<String> getInputPorts() {
- return Collections.unmodifiableSet(inputPorts);
- }
-
- public void addInputPorts(Collection<String> ids) {
- inputPorts.addAll(ids);
- }
-
- /**
- * The output ports in this snippet.
- *
- * @return
- */
- public Set<String> getOutputPorts() {
- return Collections.unmodifiableSet(outputPorts);
- }
-
- public void addOutputPorts(Collection<String> ids) {
- outputPorts.addAll(ids);
- }
-
- /**
- * The labels in this snippet.
- *
- * @return
- */
- public Set<String> getLabels() {
- return Collections.unmodifiableSet(labels);
- }
-
- public void addLabels(Collection<String> ids) {
- labels.addAll(ids);
- }
-
- public Set<String> getProcessGroups() {
- return Collections.unmodifiableSet(processGroups);
- }
-
- public void addProcessGroups(Collection<String> ids) {
- processGroups.addAll(ids);
- }
-
- public Set<String> getProcessors() {
- return Collections.unmodifiableSet(processors);
- }
-
- public void addProcessors(Collection<String> ids) {
- processors.addAll(ids);
- }
-
- public Set<String> getRemoteProcessGroups() {
- return Collections.unmodifiableSet(remoteProcessGroups);
- }
-
- public void addRemoteProcessGroups(Collection<String> ids) {
- remoteProcessGroups.addAll(ids);
- }
-
- /**
- * Determines if this snippet is empty.
- *
- * @return
- */
- public boolean isEmpty() {
- return processors.isEmpty() && processGroups.isEmpty() && remoteProcessGroups.isEmpty() && labels.isEmpty()
- && inputPorts.isEmpty() && outputPorts.isEmpty() && connections.isEmpty() && funnels.isEmpty();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/Template.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/Template.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/Template.java
deleted file mode 100644
index 26c4cd2..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/Template.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import org.apache.nifi.web.api.dto.TemplateDTO;
-
-public class Template {
-
- private final TemplateDTO dto;
-
- public Template(final TemplateDTO dto) {
- this.dto = dto;
- }
-
- /**
- * Returns a TemplateDTO object that describes the contents of this Template
- *
- * @return
- */
- public TemplateDTO getDetails() {
- return dto;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/TemplateManager.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/TemplateManager.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/TemplateManager.java
deleted file mode 100644
index aa095d1..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/TemplateManager.java
+++ /dev/null
@@ -1,507 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import static java.util.Objects.requireNonNull;
-
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.nifi.stream.io.ByteArrayInputStream;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
-import org.apache.nifi.stream.io.DataOutputStream;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.persistence.TemplateDeserializer;
-import org.apache.nifi.persistence.TemplateSerializer;
-import org.apache.nifi.web.api.dto.ConnectableDTO;
-import org.apache.nifi.web.api.dto.ConnectionDTO;
-import org.apache.nifi.web.api.dto.FlowSnippetDTO;
-import org.apache.nifi.web.api.dto.ProcessGroupDTO;
-import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
-import org.apache.nifi.web.api.dto.ProcessorConfigDTO.PropertyDescriptorDTO;
-import org.apache.nifi.web.api.dto.ProcessorDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
-import org.apache.nifi.web.api.dto.TemplateDTO;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TemplateManager {
-
- private static final Logger logger = LoggerFactory.getLogger(TemplateManager.class);
-
- private final Path directory;
- private final Map<String, Template> templateMap = new HashMap<>();
-
- private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
- private final Lock readLock = rwLock.readLock();
- private final Lock writeLock = rwLock.writeLock();
-
- private final FileFilter templateFileFilter = new FileFilter() {
- @Override
- public boolean accept(File pathname) {
- return pathname.getName().toLowerCase().endsWith(".template");
- }
- };
-
- public TemplateManager(final Path storageLocation) throws IOException {
- directory = storageLocation;
- if (!Files.exists(directory)) {
- Files.createDirectories(directory);
- } else {
- if (!Files.isDirectory(directory)) {
- throw new IllegalArgumentException(directory.toString() + " is not a directory");
- }
- // use toFile().canXXX, rather than Files.is... because on Windows 7, we sometimes get the wrong result for Files.is... (running Java 7 update 9)
- if (!directory.toFile().canExecute() || !directory.toFile().canWrite()) {
- throw new IOException("Invalid permissions for directory " + directory.toString());
- }
- }
- }
-
- /**
- * Adds a template to this manager. The contents of this template must be
- * part of the current flow. This is going create a template based on a
- * snippet of this flow. Any sensitive properties in the TemplateDTO will be
- * removed.
- *
- * @param dto
- * @return a copy of the given DTO
- * @throws IOException if an I/O error occurs when persisting the Template
- * @throws NullPointerException if the DTO is null
- * @throws IllegalArgumentException if does not contain all required
- * information, such as the template name or a processor's configuration
- * element
- */
- public Template addTemplate(final TemplateDTO dto) throws IOException {
- scrubTemplate(dto.getSnippet());
- return importTemplate(dto);
- }
-
- private void verifyCanImport(final TemplateDTO dto) {
- // ensure the template is specified
- if (dto == null || dto.getSnippet() == null) {
- throw new IllegalArgumentException("Template details not specified.");
- }
-
- // ensure the name is specified
- if (StringUtils.isBlank(dto.getName())) {
- throw new IllegalArgumentException("Template name cannot be blank.");
- }
-
- readLock.lock();
- try {
- for (final Template template : templateMap.values()) {
- final TemplateDTO existingDto = template.getDetails();
-
- // ensure a template with this name doesnt already exist
- if (dto.getName().equals(existingDto.getName())) {
- throw new IllegalStateException(String.format("A template named '%s' already exists.", dto.getName()));
- }
- }
- } finally {
- readLock.unlock();
- }
- }
-
- /**
- * Clears all Templates from the TemplateManager
- */
- public void clear() throws IOException {
- writeLock.lock();
- try {
- templateMap.clear();
-
- final File[] files = directory.toFile().listFiles(templateFileFilter);
- if (files == null) {
- return;
- }
-
- for (final File file : files) {
- boolean successful = false;
-
- for (int i = 0; i < 10; i++) {
- if (file.delete()) {
- successful = true;
- break;
- }
- }
-
- if (!successful && file.exists()) {
- throw new IOException("Failed to delete template file " + file.getAbsolutePath());
- }
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- /**
- * Returns the template with the given id, if it exists; else, returns null
- *
- * @param id
- * @return
- */
- public Template getTemplate(final String id) {
- readLock.lock();
- try {
- return templateMap.get(id);
- } finally {
- readLock.unlock();
- }
- }
-
- /**
- * Loads the templates from disk
- *
- * @throws IOException
- */
- public void loadTemplates() throws IOException {
- writeLock.lock();
- try {
- final File[] files = directory.toFile().listFiles(templateFileFilter);
- if (files == null) {
- return;
- }
-
- for (final File file : files) {
- try (final FileInputStream fis = new FileInputStream(file);
- final BufferedInputStream bis = new BufferedInputStream(fis)) {
- final TemplateDTO templateDto = TemplateDeserializer.deserialize(bis);
- templateMap.put(templateDto.getId(), new Template(templateDto));
- }
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- public Template importTemplate(final TemplateDTO dto) throws IOException {
- // ensure we can add this template
- verifyCanImport(dto);
-
- writeLock.lock();
- try {
- if (requireNonNull(dto).getId() == null) {
- dto.setId(UUID.randomUUID().toString());
- }
-
- final Template template = new Template(dto);
- persistTemplate(template);
-
- templateMap.put(dto.getId(), template);
- return template;
- } finally {
- writeLock.unlock();
- }
- }
-
- /**
- * Persists the given template to disk
- *
- * @param dto
- * @throws IOException
- */
- private void persistTemplate(final Template template) throws IOException {
- final Path path = directory.resolve(template.getDetails().getId() + ".template");
- Files.write(path, TemplateSerializer.serialize(template.getDetails()), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
- }
-
- /**
- * Scrubs the template prior to persisting in order to remove fields that
- * shouldn't be included or are unnecessary.
- *
- * @param snippet
- */
- private void scrubTemplate(final FlowSnippetDTO snippet) {
- // ensure that contents have been specified
- if (snippet != null) {
- // go through each processor if specified
- if (snippet.getProcessors() != null) {
- scrubProcessors(snippet.getProcessors());
- }
-
- // go through each connection if specified
- if (snippet.getConnections() != null) {
- scrubConnections(snippet.getConnections());
- }
-
- // go through each remote process group if specified
- if (snippet.getRemoteProcessGroups() != null) {
- scrubRemoteProcessGroups(snippet.getRemoteProcessGroups());
- }
-
- // go through each process group if specified
- if (snippet.getProcessGroups() != null) {
- scrubProcessGroups(snippet.getProcessGroups());
- }
- }
- }
-
- /**
- * Scrubs process groups prior to saving.
- *
- * @param processGroups
- */
- private void scrubProcessGroups(final Set<ProcessGroupDTO> processGroups) {
- // go through each process group
- for (final ProcessGroupDTO processGroupDTO : processGroups) {
- scrubTemplate(processGroupDTO.getContents());
- }
- }
-
- /**
- * Scrubs processors prior to saving. This includes removing sensitive
- * properties, validation errors, property descriptors, etc.
- *
- * @param snippet
- */
- private void scrubProcessors(final Set<ProcessorDTO> processors) {
- // go through each processor
- for (final ProcessorDTO processorDTO : processors) {
- final ProcessorConfigDTO processorConfig = processorDTO.getConfig();
-
- // ensure that some property configuration have been specified
- if (processorConfig != null) {
- // if properties have been specified, remove sensitive ones
- if (processorConfig.getProperties() != null) {
- Map<String, String> processorProperties = processorConfig.getProperties();
-
- // look for sensitive properties and remove them
- if (processorConfig.getDescriptors() != null) {
- final Collection<PropertyDescriptorDTO> descriptors = processorConfig.getDescriptors().values();
- for (PropertyDescriptorDTO descriptor : descriptors) {
- if (descriptor.isSensitive()) {
- processorProperties.put(descriptor.getName(), null);
- }
- }
- }
- }
-
- processorConfig.setDescriptors(null);
- processorConfig.setCustomUiUrl(null);
- }
-
- // remove validation errors
- processorDTO.setValidationErrors(null);
- }
- }
-
- /**
- * Scrubs connections prior to saving. This includes removing available
- * relationships.
- *
- * @param snippet
- */
- private void scrubConnections(final Set<ConnectionDTO> connections) {
- // go through each connection
- for (final ConnectionDTO connectionDTO : connections) {
- connectionDTO.setAvailableRelationships(null);
-
- scrubConnectable(connectionDTO.getSource());
- scrubConnectable(connectionDTO.getDestination());
- }
- }
-
- /**
- * Remove unnecessary fields in connectables prior to saving.
- *
- * @param connectable
- */
- private void scrubConnectable(final ConnectableDTO connectable) {
- if (connectable != null) {
- connectable.setComments(null);
- connectable.setExists(null);
- connectable.setRunning(null);
- connectable.setTransmitting(null);
- connectable.setName(null);
- }
- }
-
- /**
- * Remove unnecessary fields in remote groups prior to saving.
- *
- * @param remoteGroups
- */
- private void scrubRemoteProcessGroups(final Set<RemoteProcessGroupDTO> remoteGroups) {
- // go through each remote process group
- for (final RemoteProcessGroupDTO remoteProcessGroupDTO : remoteGroups) {
- remoteProcessGroupDTO.setFlowRefreshed(null);
- remoteProcessGroupDTO.setInputPortCount(null);
- remoteProcessGroupDTO.setOutputPortCount(null);
- remoteProcessGroupDTO.setTransmitting(null);
-
- // if this remote process group has contents
- if (remoteProcessGroupDTO.getContents() != null) {
- RemoteProcessGroupContentsDTO contents = remoteProcessGroupDTO.getContents();
-
- // scrub any remote input ports
- if (contents.getInputPorts() != null) {
- scrubRemotePorts(contents.getInputPorts());
- }
-
- // scrub and remote output ports
- if (contents.getOutputPorts() != null) {
- scrubRemotePorts(contents.getOutputPorts());
- }
- }
- }
- }
-
- /**
- * Remove unnecessary fields in remote ports prior to saving.
- *
- * @param remotePorts
- */
- private void scrubRemotePorts(final Set<RemoteProcessGroupPortDTO> remotePorts) {
- for (final Iterator<RemoteProcessGroupPortDTO> remotePortIter = remotePorts.iterator(); remotePortIter.hasNext();) {
- final RemoteProcessGroupPortDTO remotePortDTO = remotePortIter.next();
-
- // if the flow is not connected to this remote port, remove it
- if (remotePortDTO.isConnected() == null || !remotePortDTO.isConnected().booleanValue()) {
- remotePortIter.remove();
- continue;
- }
-
- remotePortDTO.setExists(null);
- remotePortDTO.setTargetRunning(null);
- }
- }
-
- /**
- * Removes the template with the given ID.
- *
- * @param id the ID of the template to remove
- * @throws NullPointerException if the argument is null
- * @throws IllegalStateException if no template exists with the given ID
- * @throws IOException if template could not be removed
- */
- public void removeTemplate(final String id) throws IOException, IllegalStateException {
- writeLock.lock();
- try {
- final Template removed = templateMap.remove(requireNonNull(id));
-
- // ensure the template exists
- if (removed == null) {
- throw new IllegalStateException("No template with ID " + id + " exists");
- } else {
-
- try {
- // remove the template from the archive directory
- final Path path = directory.resolve(removed.getDetails().getId() + ".template");
- Files.delete(path);
- } catch (final NoSuchFileException e) {
- logger.warn(String.format("Template file for template %s not found when attempting to remove. Continuing...", id));
- } catch (final IOException e) {
- logger.error(String.format("Unable to remove template file for template %s.", id));
-
- // since the template file existed and we were unable to remove it, rollback
- // by returning it to the template map
- templateMap.put(id, removed);
-
- // rethrow
- throw e;
- }
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- public Set<Template> getTemplates() {
- readLock.lock();
- try {
- return new HashSet<>(templateMap.values());
- } finally {
- readLock.unlock();
- }
- }
-
- public static List<Template> parseBytes(final byte[] bytes) {
- final List<Template> templates = new ArrayList<>();
-
- try (final InputStream rawIn = new ByteArrayInputStream(bytes);
- final DataInputStream in = new DataInputStream(rawIn)) {
-
- while (isMoreData(in)) {
- final int length = in.readInt();
- final byte[] buffer = new byte[length];
- StreamUtils.fillBuffer(in, buffer, true);
- final TemplateDTO dto = TemplateDeserializer.deserialize(new ByteArrayInputStream(buffer));
- templates.add(new Template(dto));
- }
- } catch (final IOException e) {
- throw new RuntimeException("Could not parse bytes", e); // won't happen because of the types of streams being used
- }
-
- return templates;
- }
-
- private static boolean isMoreData(final InputStream in) throws IOException {
- in.mark(1);
- final int nextByte = in.read();
- if (nextByte == -1) {
- return false;
- }
-
- in.reset();
- return true;
- }
-
- public byte[] export() {
- readLock.lock();
- try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final DataOutputStream dos = new DataOutputStream(baos)) {
- for (final Template template : templateMap.values()) {
- final TemplateDTO dto = template.getDetails();
- final byte[] bytes = TemplateSerializer.serialize(dto);
- dos.writeInt(bytes.length);
- dos.write(bytes);
- }
-
- return baos.toByteArray();
- } catch (final IOException e) {
- // won't happen
- return null;
- } finally {
- readLock.unlock();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/UninheritableFlowException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/UninheritableFlowException.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/UninheritableFlowException.java
deleted file mode 100644
index ac6fc5f..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/UninheritableFlowException.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-/**
- * Represents the exceptional case when a controller is to be loaded with a flow
- * that is fundamentally different than its existing flow.
- *
- * @author unattributed
- */
-public class UninheritableFlowException extends RuntimeException {
-
- private static final long serialVersionUID = 198234798234794L;
-
- public UninheritableFlowException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
-
- public UninheritableFlowException(Throwable cause) {
- super(cause);
- }
-
- public UninheritableFlowException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public UninheritableFlowException(String message) {
- super(message);
- }
-
- public UninheritableFlowException() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/exception/FlowFileConsumptionException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/exception/FlowFileConsumptionException.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/exception/FlowFileConsumptionException.java
deleted file mode 100644
index 4285e65..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/exception/FlowFileConsumptionException.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.exception;
-
-public class FlowFileConsumptionException extends Exception {
-
- private static final long serialVersionUID = 18923749824378923L;
-
- public FlowFileConsumptionException() {
- super();
- }
-
- public FlowFileConsumptionException(final Throwable cause) {
- super(cause);
- }
-
- public FlowFileConsumptionException(final String explanation) {
- super(explanation);
- }
-
- public FlowFileConsumptionException(final String explanation, final Throwable cause) {
- super(explanation, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/exception/ReportingTaskInstantiationException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/exception/ReportingTaskInstantiationException.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/exception/ReportingTaskInstantiationException.java
deleted file mode 100644
index f407048..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/exception/ReportingTaskInstantiationException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.exception;
-
-public class ReportingTaskInstantiationException extends Exception {
-
- private static final long serialVersionUID = 189234789237L;
-
- public ReportingTaskInstantiationException(final String className, final Throwable t) {
- super(className, t);
- }
-
- public ReportingTaskInstantiationException(final String message) {
- super(message);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/exception/ValidationException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/exception/ValidationException.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/exception/ValidationException.java
deleted file mode 100644
index a125716..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/exception/ValidationException.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.exception;
-
-import java.util.Collections;
-import java.util.List;
-
-public class ValidationException extends RuntimeException {
-
- private static final long serialVersionUID = 198023479823479L;
-
- private final List<String> errors;
-
- public ValidationException(final List<String> errors) {
- this.errors = errors;
- }
-
- public List<String> getValidationErrors() {
- return Collections.unmodifiableList(errors);
- }
-
- @Override
- public String getLocalizedMessage() {
- final StringBuilder sb = new StringBuilder();
- sb.append(errors.size()).append(" validation error");
- if (errors.size() == 1) {
- sb.append(": ").append(errors.get(0));
- } else {
- sb.append("s");
- }
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/label/StandardLabel.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/label/StandardLabel.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/label/StandardLabel.java
deleted file mode 100644
index c13dd47..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/label/StandardLabel.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.label;
-
-import org.apache.nifi.controller.label.Label;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.nifi.connectable.Position;
-import org.apache.nifi.connectable.Size;
-import org.apache.nifi.groups.ProcessGroup;
-
-public class StandardLabel implements Label {
-
- private final String identifier;
- private final AtomicReference<Position> position;
- private final AtomicReference<Size> size;
- private final AtomicReference<Map<String, String>> style;
- private final AtomicReference<String> value;
- private final AtomicReference<ProcessGroup> processGroup;
-
- public StandardLabel(final String identifier, final String value) {
- this(identifier, new Position(0D, 0D), new HashMap<String, String>(), value, null);
- }
-
- public StandardLabel(final String identifier, final Position position, final Map<String, String> style, final String value, final ProcessGroup processGroup) {
- this.identifier = identifier;
- this.position = new AtomicReference<>(position);
- this.style = new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>(style)));
- this.size = new AtomicReference<>(new Size(150, 150));
- this.value = new AtomicReference<>(value);
- this.processGroup = new AtomicReference<>(processGroup);
- }
-
- @Override
- public Position getPosition() {
- return position.get();
- }
-
- @Override
- public void setPosition(final Position position) {
- if (position != null) {
- this.position.set(position);
- }
- }
-
- @Override
- public Size getSize() {
- return size.get();
- }
-
- @Override
- public void setSize(final Size size) {
- if (size != null) {
- this.size.set(size);
- }
- }
-
- public String getIdentifier() {
- return identifier;
- }
-
- public Map<String, String> getStyle() {
- return style.get();
- }
-
- public void setStyle(final Map<String, String> style) {
- if (style != null) {
- boolean updated = false;
- while (!updated) {
- final Map<String, String> existingStyles = this.style.get();
- final Map<String, String> updatedStyles = new HashMap<>(existingStyles);
- updatedStyles.putAll(style);
- updated = this.style.compareAndSet(existingStyles, Collections.unmodifiableMap(updatedStyles));
- }
- }
- }
-
- public String getValue() {
- return value.get();
- }
-
- public void setValue(final String value) {
- this.value.set(value);
- }
-
- public void setProcessGroup(final ProcessGroup group) {
- this.processGroup.set(group);
- }
-
- public ProcessGroup getProcessGroup() {
- return processGroup.get();
- }
-}