You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:29:32 UTC
[09/51] [partial] incubator-nifi git commit: Initial code contribution
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Connectable.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Connectable.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Connectable.java
new file mode 100644
index 0000000..3d5c75d
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Connectable.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.connectable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.Triggerable;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+
+/**
+ * Represents a connectable component to which or from which data can flow.
+ */
+public interface Connectable extends Triggerable {
+
+ /**
+ * @return the unique identifier for this <code>Connectable</code>
+ */
+ String getIdentifier();
+
+ /**
+ * @return a Collection of all relationships for this Connectable
+ */
+ Collection<Relationship> getRelationships();
+
+ /**
+ * Returns the ProcessorRelationship whose name is given
+ *
+ * @param relationshipName
+ * @return a ProcessorRelationship whose name is given, or <code>null</code>
+ * if none exists
+ */
+ Relationship getRelationship(String relationshipName);
+
+ /**
+ * Adds the given connection to this Connectable.
+ *
+ * @param connection the connection to add
+ * @throws NullPointerException if the argument is null
+ * @throws IllegalArgumentException if the given Connection is not valid
+ */
+ void addConnection(Connection connection) throws IllegalArgumentException;
+
+ /**
+ * @return true if the Connectable is the destination of any other
+ * Connectable, false otherwise.
+ */
+ boolean hasIncomingConnection();
+
+ /**
+ *
+ * @param connection
+ * @throws IllegalStateException if the given Connection is not registered
+ * to <code>this</code>.
+ */
+ void removeConnection(Connection connection) throws IllegalStateException;
+
+ /**
+ * Updates any internal state that depends on the given connection. The
+ * given connection will share the same ID as the old connection.
+ *
+ * @param newConnection
+ * @throws IllegalStateException
+ */
+ void updateConnection(Connection newConnection) throws IllegalStateException;
+
+ /**
+ * @return a <code>Set</code> of all <code>Connection</code>s for which this
+ * <code>Connectable</code> is the destination
+ */
+ List<Connection> getIncomingConnections();
+
+ /**
+ * @return a <code>Set</code> of all <code>Connection</code>s for which this
+ * <code>Connectable</code> is the source; if no connections exist, will
+ * return an empty Collection. Guaranteed not null.
+ */
+ Set<Connection> getConnections();
+
+ /**
+ * @param relationship
+ * @return a <code>Set</code> of all <code>Connection</code>s that contain
+ * the given relationship for which this <code>Connectable</code> is the
+ * source
+ */
+ Set<Connection> getConnections(Relationship relationship);
+
+ /**
+ * Returns the position on the graph where this Connectable is located
+ *
+ * @return
+ */
+ Position getPosition();
+
+ /**
+ * Updates this component's position on the graph
+ *
+ * @param position
+ */
+ void setPosition(Position position);
+
+ /**
+ * @return the name of this Connectable
+ */
+ String getName();
+
+ /**
+ * Sets the name of this Connectable so that its name will be visible on the
+ * UI
+ * @param name
+ */
+ void setName(String name);
+
+ /**
+ * @return the comments of this Connectable
+ */
+ String getComments();
+
+ /**
+ * Sets the comments of this Connectable.
+ * @param comments
+ */
+ void setComments(String comments);
+
+ /**
+ * If true,
+ * {@link #onTrigger(nifi.processor.ProcessContext, nifi.processor.ProcessSessionFactory)}
+ * should be called even when this Connectable has no FlowFiles queued for
+ * processing
+ *
+ * @return
+ */
+ boolean isTriggerWhenEmpty();
+
+ /**
+ * Returns the ProcessGroup to which this <code>Connectable</code> belongs
+ *
+ * @return
+ */
+ ProcessGroup getProcessGroup();
+
+ /**
+ * Sets the new ProcessGroup to which this <code>Connectable</code> belongs
+ *
+ * @param group
+ */
+ void setProcessGroup(ProcessGroup group);
+
+ /**
+ *
+ * @param relationship the relationship
+ * @return true indicates flow files transferred to the given relationship
+ * should be terminated if the relationship is not connected to another
+ * FlowFileConsumer; false indicates they will not be terminated and the
+ * processor will not be valid until specified
+ */
+ boolean isAutoTerminated(Relationship relationship);
+
+ /**
+ * Indicates whether flow file content made by this connectable must be
+ * persisted
+ *
+ * @return
+ */
+ boolean isLossTolerant();
+
+ /**
+ * @param lossTolerant
+ */
+ void setLossTolerant(boolean lossTolerant);
+
+ /**
+ * @return the type of the Connectable
+ */
+ ConnectableType getConnectableType();
+
+ /**
+ * Returns the any validation errors for this connectable.
+ *
+ * @return
+ */
+ Collection<ValidationResult> getValidationErrors();
+
+ /**
+ * Returns the amount of time for which a FlowFile should be penalized when
+ * {@link ProcessSession#penalize(nifi.flowfile.FlowFile)} is called
+ *
+ * @param timeUnit
+ * @return
+ */
+ long getPenalizationPeriod(final TimeUnit timeUnit);
+
+ /**
+ * Returns a string representation for which a FlowFile should be penalized
+ * when {@link ProcessSession#penalize(nifi.flowfile.FlowFile)} is called
+ *
+ * @return
+ */
+ String getPenalizationPeriod();
+
+ /**
+ * @param timeUnit determines the unit of time to represent the yield
+ * period.
+ * @return
+ */
+ long getYieldPeriod(TimeUnit timeUnit);
+
+ /**
+ * returns the string representation for this Connectable's configured yield
+ * period
+ *
+ * @return
+ */
+ String getYieldPeriod();
+
+ /**
+ * Updates the amount of time that this Connectable should avoid being
+ * scheduled when the processor calls
+ * {@link nifi.processor.ProcessContext#yield() ProcessContext.yield()}
+ *
+ * @param yieldPeriod
+ */
+ void setYieldPeriod(String yieldPeriod);
+
+ /**
+ * Updates the amount of time that this Connectable will penalize FlowFiles
+ * when {@link ProcessSession#penalize(nifi.flowfile.FlowFile)} is called
+ * @param penalizationPeriod
+ */
+ void setPenalizationPeriod(String penalizationPeriod);
+
+ /**
+ * Causes the processor not to be scheduled for some period of time. This
+ * duration can be obtained and set via the
+ * {@link #getYieldPeriod(TimeUnit)} and
+ * {@link #setYieldPeriod(long, TimeUnit)} methods.
+ */
+ void yield();
+
+ /**
+ * Returns the time in milliseconds since Epoch at which this Connectable
+ * should no longer yield its threads
+ *
+ * @return
+ */
+ long getYieldExpiration();
+
+ /**
+ * Specifies whether or not this component is considered side-effect free,
+ * with respect to external systems.
+ *
+ * @return
+ */
+ boolean isSideEffectFree();
+
+ void verifyCanDelete() throws IllegalStateException;
+
+ void verifyCanDelete(boolean ignoreConnections) throws IllegalStateException;
+
+ void verifyCanStart() throws IllegalStateException;
+
+ void verifyCanStop() throws IllegalStateException;
+
+ void verifyCanUpdate() throws IllegalStateException;
+
+ void verifyCanEnable() throws IllegalStateException;
+
+ void verifyCanDisable() throws IllegalStateException;
+
+ SchedulingStrategy getSchedulingStrategy();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/ConnectableType.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/ConnectableType.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/ConnectableType.java
new file mode 100644
index 0000000..0334bfb
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/ConnectableType.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.connectable;
+
+import javax.xml.bind.annotation.XmlEnum;
+
+@XmlEnum
+public enum ConnectableType {
+
+ PROCESSOR,
+ /**
+ * Port that lives within an RemoteProcessGroup and is used to send data to
+ * remote NiFi instances
+ */
+ REMOTE_INPUT_PORT,
+ /**
+ * Port that lives within a RemoteProcessGroup and is used to receive data
+ * from remote NiFi instances
+ */
+ REMOTE_OUTPUT_PORT,
+ /**
+ * Root Group Input Ports and Local Input Ports
+ */
+ INPUT_PORT,
+ /**
+ * Root Group Output Ports and Local Output Ports
+ */
+ OUTPUT_PORT,
+ FUNNEL
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Connection.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Connection.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Connection.java
new file mode 100644
index 0000000..0a0089d
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Connection.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.connectable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.nifi.controller.FlowFileQueue;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.processor.FlowFileFilter;
+import org.apache.nifi.processor.Relationship;
+
+public interface Connection {
+
+ void enqueue(FlowFileRecord flowFile);
+
+ void enqueue(Collection<FlowFileRecord> flowFiles);
+
+ Connectable getDestination();
+
+ Collection<Relationship> getRelationships();
+
+ FlowFileQueue getFlowFileQueue();
+
+ String getIdentifier();
+
+ String getName();
+
+ void setName(String name);
+
+ void setBendPoints(List<Position> position);
+
+ List<Position> getBendPoints();
+
+ int getLabelIndex();
+
+ void setLabelIndex(int labelIndex);
+
+ long getZIndex();
+
+ void setZIndex(long zIndex);
+
+ Connectable getSource();
+
+ void setRelationships(Collection<Relationship> newRelationships);
+
+ void setDestination(final Connectable newDestination);
+
+ void setProcessGroup(ProcessGroup processGroup);
+
+ ProcessGroup getProcessGroup();
+
+ void lock();
+
+ void unlock();
+
+ List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords);
+
+ void verifyCanUpdate() throws IllegalStateException;
+
+ void verifyCanDelete() throws IllegalStateException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Funnel.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Funnel.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Funnel.java
new file mode 100644
index 0000000..cceca8f
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Funnel.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.connectable;
+
+import org.apache.nifi.controller.ScheduledState;
+
+public interface Funnel extends Connectable {
+
+ void setScheduledState(ScheduledState scheduledState);
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Port.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Port.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Port.java
new file mode 100644
index 0000000..907dd92
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Port.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.connectable;
+
+public interface Port extends Connectable {
+
+ void shutdown();
+
+ boolean isValid();
+
+ /**
+ * <p>
+ * This method is called just before a Port is scheduled to run, giving the
+ * Port a chance to initialize any resources needed.</p>
+ */
+ void onSchedulingStart();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Position.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Position.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Position.java
new file mode 100644
index 0000000..75d04f5
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Position.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.connectable;
+
+public class Position {
+
+ private final double x;
+ private final double y;
+
+ public Position(final double x, final double y) {
+ this.x = x;
+ this.y = y;
+ }
+
+ public double getX() {
+ return x;
+ }
+
+ public double getY() {
+ return y;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Size.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Size.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Size.java
new file mode 100644
index 0000000..cea13d2
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/connectable/Size.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.connectable;
+
+public class Size {
+
+ private final double width;
+ private final double height;
+
+ public Size(final double width, final double height) {
+ this.width = width;
+ this.height = height;
+ }
+
+ public double getWidth() {
+ return width;
+ }
+
+ public double getHeight() {
+ return height;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
new file mode 100644
index 0000000..ef4b72a
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.nifi.components.ConfigurableComponent;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.nar.NarCloseable;
+
+public abstract class AbstractConfiguredComponent implements ConfigurableComponent, ConfiguredComponent {
+
+ private final String id;
+ private final ConfigurableComponent component;
+ private final ValidationContextFactory validationContextFactory;
+ private final ControllerServiceProvider serviceProvider;
+
+ private final AtomicReference<String> name = new AtomicReference<>();
+ private final AtomicReference<String> annotationData = new AtomicReference<>();
+
+ private final Lock lock = new ReentrantLock();
+ private final ConcurrentMap<PropertyDescriptor, String> properties = new ConcurrentHashMap<>();
+
+ public AbstractConfiguredComponent(final ConfigurableComponent component, final String id,
+ final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
+ this.id = id;
+ this.component = component;
+ this.validationContextFactory = validationContextFactory;
+ this.serviceProvider = serviceProvider;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return id;
+ }
+
+ @Override
+ public String getName() {
+ return name.get();
+ }
+
+ @Override
+ public void setName(final String name) {
+ this.name.set(Objects.requireNonNull(name).intern());
+ }
+
+ @Override
+ public String getAnnotationData() {
+ return annotationData.get();
+ }
+
+ @Override
+ public void setAnnotationData(final String data) {
+ annotationData.set(data);
+ }
+
+ @Override
+ public void setProperty(final String name, final String value) {
+ if (null == name || null == value) {
+ throw new IllegalArgumentException();
+ }
+
+ lock.lock();
+ try {
+ verifyModifiable();
+
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ final PropertyDescriptor descriptor = component.getPropertyDescriptor(name);
+
+ final String oldValue = properties.put(descriptor, value);
+ if (!value.equals(oldValue)) {
+
+ if (descriptor.getControllerServiceDefinition() != null) {
+ if (oldValue != null) {
+ final ControllerServiceNode oldNode = serviceProvider.getControllerServiceNode(oldValue);
+ if (oldNode != null) {
+ oldNode.removeReference(this);
+ }
+ }
+
+ final ControllerServiceNode newNode = serviceProvider.getControllerServiceNode(value);
+ if (newNode != null) {
+ newNode.addReference(this);
+ }
+ }
+
+ try {
+ component.onPropertyModified(descriptor, oldValue, value);
+ } catch (final Throwable t) {
+ // nothing really to do here...
+ }
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Removes the property and value for the given property name if a
+ * descriptor and value exists for the given name. If the property is
+ * optional its value might be reset to default or will be removed entirely
+ * if was a dynamic property.
+ *
+ * @param name the property to remove
+ * @return true if removed; false otherwise
+ * @throws java.lang.IllegalArgumentException if the name is null
+ */
+ @Override
+ public boolean removeProperty(final String name) {
+ if (null == name) {
+ throw new IllegalArgumentException();
+ }
+
+ lock.lock();
+ try {
+ verifyModifiable();
+
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ final PropertyDescriptor descriptor = component.getPropertyDescriptor(name);
+ String value = null;
+ if (!descriptor.isRequired() && (value = properties.remove(descriptor)) != null) {
+ component.onPropertyModified(descriptor, value, null);
+ return true;
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ return false;
+ }
+
+ @Override
+ public Map<PropertyDescriptor, String> getProperties() {
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ final List<PropertyDescriptor> supported = component.getPropertyDescriptors();
+ if (supported == null || supported.isEmpty()) {
+ return Collections.unmodifiableMap(properties);
+ } else {
+ final Map<PropertyDescriptor, String> props = new LinkedHashMap<>();
+ for (final PropertyDescriptor descriptor : supported) {
+ props.put(descriptor, null);
+ }
+ props.putAll(properties);
+ return props;
+ }
+ }
+ }
+
+ @Override
+ public String getProperty(final PropertyDescriptor property) {
+ return properties.get(property);
+ }
+
+ @Override
+ public int hashCode() {
+ return 273171 * id.hashCode();
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+
+ if (!(obj instanceof ConfiguredComponent)) {
+ return false;
+ }
+
+ final ConfiguredComponent other = (ConfiguredComponent) obj;
+ return id.equals(other.getIdentifier());
+ }
+
+ @Override
+ public String toString() {
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ return component.toString();
+ }
+ }
+
+ @Override
+ public Collection<ValidationResult> validate(final ValidationContext context) {
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ return component.validate(context);
+ }
+ }
+
+ @Override
+ public PropertyDescriptor getPropertyDescriptor(final String name) {
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ return component.getPropertyDescriptor(name);
+ }
+ }
+
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ component.onPropertyModified(descriptor, oldValue, newValue);
+ }
+ }
+
+ @Override
+ public List<PropertyDescriptor> getPropertyDescriptors() {
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ return component.getPropertyDescriptors();
+ }
+ }
+
+ @Override
+ public boolean isValid() {
+ final Collection<ValidationResult> validationResults = validate(validationContextFactory.newValidationContext(getProperties(), getAnnotationData()));
+ for (final ValidationResult result : validationResults) {
+ if (!result.isValid()) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public Collection<ValidationResult> getValidationErrors() {
+ final List<ValidationResult> results = new ArrayList<>();
+ lock.lock();
+ try {
+ final ValidationContext validationContext = validationContextFactory.newValidationContext(getProperties(), getAnnotationData());
+
+ final Collection<ValidationResult> validationResults;
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ validationResults = component.validate(validationContext);
+ }
+
+ for (final ValidationResult result : validationResults) {
+ if (!result.isValid()) {
+ results.add(result);
+ }
+ }
+ } catch (final Throwable t) {
+ results.add(new ValidationResult.Builder().explanation("Failed to run validation due to " + t.toString()).valid(false).build());
+ } finally {
+ lock.unlock();
+ }
+ return results;
+ }
+
+ public abstract void verifyModifiable() throws IllegalStateException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
new file mode 100644
index 0000000..e1d2dd4
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
@@ -0,0 +1,636 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.connectable.Position;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.FormatUtils;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+public abstract class AbstractPort implements Port {
+
+ public static final Relationship PORT_RELATIONSHIP = new Relationship.Builder()
+ .description("The relationship through which all Flow Files are transferred")
+ .name("")
+ .build();
+
+ public static final long MINIMUM_PENALIZATION_MILLIS = 0L;
+ public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
+
+ public static final long MINIMUM_YIELD_MILLIS = 0L;
+ public static final long DEFAULT_YIELD_PERIOD = 10000L;
+ public static final TimeUnit DEFAULT_YIELD_TIME_UNIT = TimeUnit.MILLISECONDS;
+
+ private final List<Relationship> relationships;
+
+ private final String id;
+ private final ConnectableType type;
+ private final AtomicReference<String> name;
+ private final AtomicReference<Position> position;
+ private final AtomicReference<String> comments;
+ private final AtomicReference<ProcessGroup> processGroup;
+ private final AtomicBoolean lossTolerant;
+ private final AtomicReference<ScheduledState> scheduledState;
+ private final AtomicInteger concurrentTaskCount;
+ private final AtomicReference<String> penalizationPeriod;
+ private final AtomicReference<String> yieldPeriod;
+ private final AtomicReference<String> schedulingPeriod;
+ private final AtomicLong schedulingNanos;
+ private final AtomicLong yieldExpiration;
+ private final ProcessScheduler processScheduler;
+
+ private final Set<Connection> outgoingConnections;
+ private final List<Connection> incomingConnections;
+
+ private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+ private final Lock readLock = rwLock.readLock();
+ private final Lock writeLock = rwLock.writeLock();
+
+ public AbstractPort(final String id, final String name, final ProcessGroup processGroup, final ConnectableType type, final ProcessScheduler scheduler) {
+ this.id = requireNonNull(id);
+ this.name = new AtomicReference<>(requireNonNull(name));
+ position = new AtomicReference<>(new Position(0D, 0D));
+ outgoingConnections = new HashSet<>();
+ incomingConnections = new ArrayList<>();
+ comments = new AtomicReference<>();
+ lossTolerant = new AtomicBoolean(false);
+ concurrentTaskCount = new AtomicInteger(1);
+ processScheduler = scheduler;
+
+ final List<Relationship> relationshipList = new ArrayList<>();
+ relationshipList.add(PORT_RELATIONSHIP);
+ relationships = Collections.unmodifiableList(relationshipList);
+ this.processGroup = new AtomicReference<>(processGroup);
+ this.type = type;
+ penalizationPeriod = new AtomicReference<>("30 sec");
+ yieldPeriod = new AtomicReference<>("1 sec");
+ yieldExpiration = new AtomicLong(0L);
+ schedulingPeriod = new AtomicReference<>("0 millis");
+ schedulingNanos = new AtomicLong(30000);
+ scheduledState = new AtomicReference<>(ScheduledState.STOPPED);
+ }
+
+ @Override
+ public String getIdentifier() {
+ return id;
+ }
+
+ @Override
+ public String getName() {
+ return name.get();
+ }
+
+ @Override
+ public void setName(final String name) {
+ if (this.name.get().equals(name)) {
+ return;
+ }
+
+ final ProcessGroup parentGroup = this.processGroup.get();
+ if (getConnectableType() == ConnectableType.INPUT_PORT) {
+ if (parentGroup.getInputPortByName(name) != null) {
+ throw new IllegalStateException("Cannot rename port from " + this.name.get() + " to " + name + " because the ProcessGroup already has an Input Port named " + name);
+ }
+ } else if (getConnectableType() == ConnectableType.OUTPUT_PORT) {
+ if (parentGroup.getOutputPortByName(name) != null) {
+ throw new IllegalStateException("Cannot rename port from " + this.name.get() + " to " + name + " because the ProcessGroup already has an Output Port named " + name);
+ }
+ }
+
+ this.name.set(name);
+ }
+
+ @Override
+ public ProcessGroup getProcessGroup() {
+ return processGroup.get();
+ }
+
+ @Override
+ public void setProcessGroup(final ProcessGroup newGroup) {
+ this.processGroup.set(newGroup);
+ }
+
+ @Override
+ public String getComments() {
+ return comments.get();
+ }
+
+ @Override
+ public void setComments(final String comments) {
+ this.comments.set(comments);
+ }
+
+ @Override
+ public Collection<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ public Relationship getRelationship(final String relationshipName) {
+ if (PORT_RELATIONSHIP.getName().equals(relationshipName)) {
+ return PORT_RELATIONSHIP;
+ }
+ return null;
+ }
+
+ @Override
+ public void addConnection(final Connection connection) throws IllegalArgumentException {
+ writeLock.lock();
+ try {
+ if (!requireNonNull(connection).getSource().equals(this)) {
+ if (connection.getDestination().equals(this)) {
+ // don't add the connection twice. This may occur if we have a self-loop because we will be told
+ // to add the connection once because we are the source and again because we are the destination.
+ if (!incomingConnections.contains(connection)) {
+ incomingConnections.add(connection);
+ }
+
+ return;
+ } else {
+ throw new IllegalArgumentException("Cannot add a connection to a LocalPort for which the LocalPort is neither the Source nor the Destination");
+ }
+ }
+
+ for (final Relationship relationship : connection.getRelationships()) {
+ if (!relationship.equals(PORT_RELATIONSHIP)) {
+ throw new IllegalArgumentException("No relationship with name " + relationship + " exists for Local Ports");
+ }
+ }
+
+ // don't add the connection twice. This may occur if we have a self-loop because we will be told
+ // to add the connection once because we are the source and again because we are the destination.
+ if (!outgoingConnections.contains(connection)) {
+ outgoingConnections.add(connection);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean hasIncomingConnection() {
+ readLock.lock();
+ try {
+ return !incomingConnections.isEmpty();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
+ final ProcessSession session = sessionFactory.createSession();
+
+ try {
+ onTrigger(context, session);
+ session.commit();
+ } catch (final ProcessException e) {
+ session.rollback();
+ throw e;
+ } catch (final Throwable t) {
+ session.rollback();
+ throw new RuntimeException(t);
+ }
+ }
+
+ public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
+
+ @Override
+ public void updateConnection(final Connection connection) throws IllegalStateException {
+ if (requireNonNull(connection).getSource().equals(this)) {
+ writeLock.lock();
+ try {
+ if (!outgoingConnections.remove(connection)) {
+ throw new IllegalStateException("No Connection with ID " + connection.getIdentifier() + " is currently registered with this Port");
+ }
+ outgoingConnections.add(connection);
+ } finally {
+ writeLock.unlock();
+ }
+ } else if (connection.getDestination().equals(this)) {
+ writeLock.lock();
+ try {
+ if (!incomingConnections.remove(connection)) {
+ throw new IllegalStateException("No Connection with ID " + connection.getIdentifier() + " is currently registered with this Port");
+ }
+ incomingConnections.add(connection);
+ } finally {
+ writeLock.unlock();
+ }
+ } else {
+ throw new IllegalStateException("The given connection is not currently registered for this Port");
+ }
+ }
+
+ @Override
+ public void removeConnection(final Connection connection) throws IllegalArgumentException, IllegalStateException {
+ writeLock.lock();
+ try {
+ if (!requireNonNull(connection).getSource().equals(this)) {
+ final boolean existed = incomingConnections.remove(connection);
+ if (!existed) {
+ throw new IllegalStateException("The given connection is not currently registered for this Port");
+ }
+ return;
+ }
+
+ if (!canConnectionBeRemoved(connection)) {
+ // TODO: Determine which processors will be broken if connection is removed, rather than just returning a boolean
+ throw new IllegalStateException(connection + " cannot be removed");
+ }
+
+ final boolean removed = outgoingConnections.remove(connection);
+ if (!removed) {
+ throw new IllegalStateException(connection + " is not registered with " + this);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Verify that removing this connection will not prevent this Port from
+ * still being connected via each relationship
+ *
+ * @param connection
+ * @return
+ */
+ private boolean canConnectionBeRemoved(final Connection connection) {
+ final Connectable source = connection.getSource();
+ if (!source.isRunning()) {
+ // we don't have to verify that this Connectable is still connected because it's okay to make
+ // the source invalid since it is not running.
+ return true;
+ }
+
+ for (final Relationship relationship : source.getRelationships()) {
+ if (source.isAutoTerminated(relationship)) {
+ continue;
+ }
+
+ final Set<Connection> connectionsForRelationship = source.getConnections(relationship);
+ if (connectionsForRelationship == null || connectionsForRelationship.isEmpty()) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public Set<Connection> getConnections() {
+ readLock.lock();
+ try {
+ return Collections.unmodifiableSet(outgoingConnections);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public Set<Connection> getConnections(final Relationship relationship) {
+ readLock.lock();
+ try {
+ if (relationship.equals(PORT_RELATIONSHIP)) {
+ return Collections.unmodifiableSet(outgoingConnections);
+ }
+
+ throw new IllegalArgumentException("No relationship with name " + relationship.getName() + " exists for Local Ports");
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public Position getPosition() {
+ return position.get();
+ }
+
+ @Override
+ public void setPosition(final Position position) {
+ this.position.set(position);
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("name", getName()).append("id", getIdentifier()).toString();
+ }
+
+ @Override
+ public List<Connection> getIncomingConnections() {
+ readLock.lock();
+ try {
+ return Collections.unmodifiableList(incomingConnections);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * Indicates whether or not this Port is valid.
+ *
+ * @return
+ */
+ @Override
+ public abstract boolean isValid();
+
+ @Override
+ public boolean isAutoTerminated(final Relationship relationship) {
+ return false;
+ }
+
+ @Override
+ public boolean isLossTolerant() {
+ return lossTolerant.get();
+ }
+
+ @Override
+ public void setLossTolerant(boolean lossTolerant) {
+ this.lossTolerant.set(lossTolerant);
+ }
+
+ @Override
+ public void setMaxConcurrentTasks(final int taskCount) {
+ if (taskCount < 1) {
+ throw new IllegalArgumentException();
+ }
+ concurrentTaskCount.set(taskCount);
+ }
+
+ /**
+ * @return the number of tasks that may execute concurrently for this
+ * processor
+ */
+ @Override
+ public int getMaxConcurrentTasks() {
+ return concurrentTaskCount.get();
+ }
+
+ /**
+ *
+ */
+ @Override
+ public void shutdown() {
+ scheduledState.set(ScheduledState.STOPPED);
+ }
+
+ @Override
+ public void onSchedulingStart() {
+ scheduledState.set(ScheduledState.RUNNING);
+ }
+
+ public void disable() {
+ final boolean updated = scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED);
+ if (!updated) {
+ throw new IllegalStateException("Port cannot be disabled because it is not stopped");
+ }
+ }
+
+ public void enable() {
+ final boolean updated = scheduledState.compareAndSet(ScheduledState.DISABLED, ScheduledState.STOPPED);
+ if (!updated) {
+ throw new IllegalStateException("Port cannot be enabled because it is not disabled");
+ }
+ }
+
+ @Override
+ public boolean isRunning() {
+ return getScheduledState().equals(ScheduledState.RUNNING) || processScheduler.getActiveThreadCount(this) > 0;
+ }
+
+ @Override
+ public ScheduledState getScheduledState() {
+ return scheduledState.get();
+ }
+
+ @Override
+ public ConnectableType getConnectableType() {
+ return type;
+ }
+
+ /**
+ * Updates the amount of time that this processor should avoid being
+ * scheduled when the processor calls
+ * {@link nifi.processor.ProcessContext#yield() ProcessContext.yield()}
+ *
+ * @param yieldPeriod
+ */
+ @Override
+ public void setYieldPeriod(final String yieldPeriod) {
+ final long yieldMillis = FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.MILLISECONDS);
+ if (yieldMillis < 0) {
+ throw new IllegalArgumentException("Yield duration must be positive");
+ }
+ this.yieldPeriod.set(yieldPeriod);
+ }
+
+ /**
+ * @param schedulingPeriod
+ */
+ @Override
+ public void setScheduldingPeriod(final String schedulingPeriod) {
+ final long schedulingNanos = FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod), TimeUnit.NANOSECONDS);
+ if (schedulingNanos < 0) {
+ throw new IllegalArgumentException("Scheduling Period must be positive");
+ }
+
+ this.schedulingPeriod.set(schedulingPeriod);
+ this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS, schedulingNanos));
+ }
+
+ @Override
+ public long getPenalizationPeriod(final TimeUnit timeUnit) {
+ return FormatUtils.getTimeDuration(getPenalizationPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit);
+ }
+
+ @Override
+ public String getPenalizationPeriod() {
+ return penalizationPeriod.get();
+ }
+
+ /**
+ * Causes the processor not to be scheduled for some period of time. This
+ * duration can be obtained and set via the
+ * {@link #getYieldPeriod(TimeUnit)} and
+ * {@link #setYieldPeriod(long, TimeUnit)} methods.
+ */
+ @Override
+ public void yield() {
+ final long yieldMillis = getYieldPeriod(TimeUnit.MILLISECONDS);
+ yieldExpiration.set(Math.max(yieldExpiration.get(), System.currentTimeMillis() + yieldMillis));
+ }
+
+ @Override
+ public long getYieldExpiration() {
+ return yieldExpiration.get();
+ }
+
+ @Override
+ public long getSchedulingPeriod(final TimeUnit timeUnit) {
+ return timeUnit.convert(schedulingNanos.get(), TimeUnit.NANOSECONDS);
+ }
+
+ @Override
+ public String getSchedulingPeriod() {
+ return schedulingPeriod.get();
+ }
+
+ @Override
+ public void setPenalizationPeriod(final String penalizationPeriod) {
+ this.penalizationPeriod.set(penalizationPeriod);
+ }
+
+ @Override
+ public String getYieldPeriod() {
+ return yieldPeriod.get();
+ }
+
+ @Override
+ public long getYieldPeriod(final TimeUnit timeUnit) {
+ return FormatUtils.getTimeDuration(getYieldPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit);
+ }
+
+ @Override
+ public void verifyCanDelete() throws IllegalStateException {
+ verifyCanDelete(false);
+ }
+
+ @Override
+ public void verifyCanDelete(final boolean ignoreConnections) {
+ readLock.lock();
+ try {
+ if (isRunning()) {
+ throw new IllegalStateException(this + " is running");
+ }
+
+ if (!ignoreConnections) {
+ for (final Connection connection : outgoingConnections) {
+ connection.verifyCanDelete();
+ }
+
+ for (final Connection connection : incomingConnections) {
+ if (connection.getSource().equals(this)) {
+ connection.verifyCanDelete();
+ } else {
+ throw new IllegalStateException(this + " is the destination of another component");
+ }
+ }
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void verifyCanStart() {
+ readLock.lock();
+ try {
+ if (scheduledState.get() != ScheduledState.STOPPED) {
+ throw new IllegalStateException(this + " is not stopped");
+ }
+ verifyNoActiveThreads();
+
+ final Collection<ValidationResult> validationResults = getValidationErrors();
+ if (!validationResults.isEmpty()) {
+ throw new IllegalStateException(this + " is not in a valid state: " + validationResults.iterator().next().getExplanation());
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void verifyCanStop() {
+ if (getScheduledState() != ScheduledState.RUNNING) {
+ throw new IllegalStateException(this + " is not scheduled to run");
+ }
+ }
+
+ @Override
+ public void verifyCanUpdate() {
+ readLock.lock();
+ try {
+ if (isRunning()) {
+ throw new IllegalStateException(this + " is not stopped");
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void verifyCanEnable() {
+ readLock.lock();
+ try {
+ if (getScheduledState() != ScheduledState.DISABLED) {
+ throw new IllegalStateException(this + " is not disabled");
+ }
+
+ verifyNoActiveThreads();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void verifyCanDisable() {
+ readLock.lock();
+ try {
+ if (getScheduledState() != ScheduledState.STOPPED) {
+ throw new IllegalStateException(this + " is not stopped");
+ }
+ verifyNoActiveThreads();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private void verifyNoActiveThreads() throws IllegalStateException {
+ final int threadCount = processScheduler.getActiveThreadCount(this);
+ if (threadCount > 0) {
+ throw new IllegalStateException(this + " has " + threadCount + " threads still active");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Availability.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Availability.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Availability.java
new file mode 100644
index 0000000..38df6f7
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Availability.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+public enum Availability {
+
+ CLUSTER_MANAGER_ONLY,
+ NODE_ONLY,
+ BOTH;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java
new file mode 100644
index 0000000..5b95524
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationResult;
+
+public interface ConfiguredComponent {
+
+ public String getIdentifier();
+
+ public String getName();
+
+ public void setName(String name);
+
+ public String getAnnotationData();
+
+ public void setAnnotationData(String data);
+
+ public void setProperty(String name, String value);
+
+ /**
+ * Removes the property and value for the given property name if a
+ * descriptor and value exists for the given name. If the property is
+ * optional its value might be reset to default or will be removed entirely
+ * if was a dynamic property.
+ *
+ * @param name the property to remove
+ * @return true if removed; false otherwise
+ * @throws java.lang.IllegalArgumentException if the name is null
+ */
+ public boolean removeProperty(String name);
+
+ public Map<PropertyDescriptor, String> getProperties();
+
+ public String getProperty(final PropertyDescriptor property);
+
+ boolean isValid();
+
+ /**
+ * Returns the any validation errors for this connectable.
+ *
+ * @return
+ */
+ Collection<ValidationResult> getValidationErrors();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ContentAvailability.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ContentAvailability.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ContentAvailability.java
new file mode 100644
index 0000000..eee878e
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ContentAvailability.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+/**
+ * Provides information about whether or not the data referenced in a Provenance
+ * Event can be replayed or downloaded
+ */
+public interface ContentAvailability {
+
+ /**
+ * Returns a boolean indicating whether or not the Input content is
+ * available
+ *
+ * @return
+ */
+ boolean isInputAvailable();
+
+ /**
+ * Returns a boolean indicating whether or not the Output content is
+ * available
+ *
+ * @return
+ */
+ boolean isOutputAvailable();
+
+ /**
+ * Returns <code>true</code> if the Input content is the same as the Output
+ * content
+ *
+ * @return
+ */
+ boolean isContentSame();
+
+ /**
+ * Returns a boolean indicating whether or not the content is replayable. If
+ * this returns <code>false</code>, the reason that replay is not available
+ * can be determined by calling {@link #getReasonNotReplayable()}.
+ *
+ * @return
+ */
+ boolean isReplayable();
+
+ /**
+ * Returns the reason that the content cannot be replayed, or
+ * <code>null</code> if the content can be replayed.
+ *
+ * @return
+ */
+ String getReasonNotReplayable();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Counter.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Counter.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Counter.java
new file mode 100644
index 0000000..eaa0c48
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Counter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+public interface Counter {
+
+ void adjust(long delta);
+
+ String getName();
+
+ long getValue();
+
+ String getContext();
+
+ String getIdentifier();
+
+ void reset();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/EventBasedWorker.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/EventBasedWorker.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/EventBasedWorker.java
new file mode 100644
index 0000000..280f69d
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/EventBasedWorker.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import org.apache.nifi.connectable.Connectable;
+
+/**
+ * Wraps a Connectable object and maintains a count of how many unanswered
+ * events have been reported for the Connectable
+ */
+public interface EventBasedWorker {
+
+ Connectable getConnectable();
+
+ int incrementEventCount();
+
+ int decrementEventCount();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java
new file mode 100644
index 0000000..1195bc9
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+public interface Heartbeater {
+
+ void heartbeat();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
new file mode 100644
index 0000000..303f540
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.Funnel;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.processor.annotation.OnScheduled;
+import org.apache.nifi.processor.annotation.OnUnscheduled;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+
+public interface ProcessScheduler {
+
+ /**
+ * Shuts down the scheduler, stopping all components
+ */
+ void shutdown();
+
+ /**
+ * Starts scheduling the given processor to run after invoking all methods
+ * on the underlying {@link nifi.processor.Processor FlowFileProcessor} that
+ * are annotated with the {@link OnScheduled} annotation. If the Processor
+ * is already scheduled to run, does nothing.
+ *
+ * @param procNode
+ * @throws IllegalStateException if the Processor is disabled
+ */
+ void startProcessor(ProcessorNode procNode);
+
+ /**
+ * Stops scheduling the given processor to run and invokes all methods on
+ * the underlying {@link nifi.processor.Processor FlowFileProcessor} that
+ * are annotated with the {@link OnUnscheduled} annotation. This does not
+ * interrupt any threads that are currently running within the given
+ * Processor. If the Processor is not scheduled to run, does nothing.
+ * @param procNode
+ */
+ void stopProcessor(ProcessorNode procNode);
+
+ /**
+ * Starts scheduling the given Port to run. If the Port is already scheduled
+ * to run, does nothing.
+ *
+ * @param port
+ *
+ * @throws IllegalStateException if the Port is disabled
+ */
+ void startPort(Port port);
+
+ /**
+ * Stops scheduling the given Port to run. This does not interrupt any
+ * threads that are currently running within the given port. This does not
+ * interrupt any threads that are currently running within the given Port.
+ * If the Port is not scheduled to run, does nothing.
+ *
+ * @param port
+ */
+ void stopPort(Port port);
+
+ /**
+ * Starts scheduling the given Funnel to run. If the funnel is already
+ * scheduled to run, does nothing.
+ *
+ * @param funnel
+ *
+ * @throws IllegalStateException if the Funnel is disabled
+ */
+ void startFunnel(Funnel funnel);
+
+ /**
+ * Stops scheduling the given Funnel to run. This does not interrupt any
+ * threads that are currently running within the given funnel. If the funnel
+ * is not scheduled to run, does nothing.
+ *
+ * @param funnel
+ */
+ void stopFunnel(Funnel funnel);
+
+ void enableFunnel(Funnel funnel);
+
+ void enablePort(Port port);
+
+ void enableProcessor(ProcessorNode procNode);
+
+ void disableFunnel(Funnel funnel);
+
+ void disablePort(Port port);
+
+ void disableProcessor(ProcessorNode procNode);
+
+ /**
+ * Returns the number of threads currently active for the given
+ * <code>Connectable</code>.
+ *
+ * @param scheduled
+ * @return
+ */
+ int getActiveThreadCount(Object scheduled);
+
+ /**
+ * Returns a boolean indicating whether or not the given object is scheduled
+ * to run
+ *
+ * @param scheduled
+ * @return
+ */
+ boolean isScheduled(Object scheduled);
+
+ /**
+ * Registers a relevant event for an Event-Driven worker
+ *
+ * @param worker
+ */
+ void registerEvent(Connectable worker);
+
+ /**
+ * Notifies the ProcessScheduler of how many threads are available to use
+ * for the given {@link SchedulingStrategy}
+ *
+ * @param strategy
+ * @param maxThreadCount
+ */
+ void setMaxThreadCount(SchedulingStrategy strategy, int maxThreadCount);
+
+ /**
+ * Notifies the Scheduler that it should stop scheduling the given component
+ * until its yield duration has expired
+ *
+ * @param procNode
+ */
+ void yield(ProcessorNode procNode);
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
new file mode 100644
index 0000000..f6786fa
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+
+public abstract class ProcessorNode extends AbstractConfiguredComponent implements Connectable {
+
+ public ProcessorNode(final Processor processor, final String id,
+ final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
+ super(processor, id, validationContextFactory, serviceProvider);
+ }
+
+ public abstract boolean isIsolated();
+
+ public abstract boolean isTriggerWhenAnyDestinationAvailable();
+
+ @Override
+ public abstract boolean isSideEffectFree();
+
+ public abstract boolean isTriggeredSerially();
+
+ public abstract boolean isEventDrivenSupported();
+
+ public abstract boolean isHighThroughputSupported();
+
+ @Override
+ public abstract boolean isValid();
+
+ public abstract void setScheduledState(ScheduledState scheduledState);
+
+ public abstract void setBulletinLevel(LogLevel bulletinLevel);
+
+ public abstract LogLevel getBulletinLevel();
+
+ public abstract Processor getProcessor();
+
+ public abstract void yield(long period, TimeUnit timeUnit);
+
+ public abstract void setAutoTerminatedRelationships(Set<Relationship> relationships);
+
+ public abstract Set<Relationship> getAutoTerminatedRelationships();
+
+ public abstract void setSchedulingStrategy(SchedulingStrategy schedulingStrategy);
+
+ @Override
+ public abstract SchedulingStrategy getSchedulingStrategy();
+
+ public abstract void setRunDuration(long duration, TimeUnit timeUnit);
+
+ public abstract long getRunDuration(TimeUnit timeUnit);
+
+ public abstract Map<String, String> getStyle();
+
+ public abstract void setStyle(Map<String, String> style);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
new file mode 100644
index 0000000..6b8ede0
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.ReportingTask;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+
+public interface ReportingTaskNode extends ConfiguredComponent {
+
+ Availability getAvailability();
+
+ void setAvailability(Availability availability);
+
+ void setSchedulingStrategy(SchedulingStrategy schedulingStrategy);
+
+ SchedulingStrategy getSchedulingStrategy();
+
+ /**
+ * @return a string representation of the time between each scheduling
+ * period
+ */
+ String getSchedulingPeriod();
+
+ long getSchedulingPeriod(TimeUnit timeUnit);
+
+ /**
+ * Updates how often the ReportingTask should be triggered to run
+ * @param schedulingPeriod
+ */
+ void setScheduldingPeriod(String schedulingPeriod);
+
+ ReportingTask getReportingTask();
+
+ ReportingContext getReportingContext();
+
+ ConfigurationContext getConfigurationContext();
+
+ boolean isRunning();
+}