You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2016/07/14 22:33:10 UTC
[04/10] nifi git commit: NIFI-1896 This closes #650. Refactored
nifi-api into nifi-framework-api and other locations. The nifi-api is
specific to that which is needed for intended extension points.
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1129706/nifi-framework-api/src/main/java/org/apache/nifi/authorization/resource/Authorizable.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/authorization/resource/Authorizable.java b/nifi-framework-api/src/main/java/org/apache/nifi/authorization/resource/Authorizable.java
new file mode 100644
index 0000000..7cb21ae
--- /dev/null
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/authorization/resource/Authorizable.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.authorization.resource;
+
+import org.apache.nifi.authorization.AccessDeniedException;
+import org.apache.nifi.authorization.AuthorizationRequest;
+import org.apache.nifi.authorization.AuthorizationResult;
+import org.apache.nifi.authorization.AuthorizationResult.Result;
+import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.RequestAction;
+import org.apache.nifi.authorization.Resource;
+import org.apache.nifi.authorization.UserContextKeys;
+import org.apache.nifi.authorization.user.NiFiUser;
+
+import java.util.Map;
+import java.util.HashMap;
+
+public interface Authorizable {
+
+ /**
+ * The parent for this Authorizable. May be null.
+ *
+ * @return the parent authorizable or null
+ */
+ Authorizable getParentAuthorizable();
+
+ /**
+ * The Resource for this Authorizable.
+ *
+ * @return the parent resource
+ */
+ Resource getResource();
+
+ /**
+ * Returns whether the current user is authorized for the specified action on the specified resource. This
+ * method does not imply the user is directly attempting to access the specified resource. If the user is
+ * attempting a direct access use Authorizable.authorize().
+ *
+ * @param authorizer authorizer
+ * @param action action
+ * @return is authorized
+ */
+ default boolean isAuthorized(Authorizer authorizer, RequestAction action, NiFiUser user) {
+ return Result.Approved.equals(checkAuthorization(authorizer, action, user).getResult());
+ }
+
+ /**
+ * Returns the result of an authorization request for the specified user for the specified action on the specified
+ * resource. This method does not imply the user is directly attempting to access the specified resource. If the user is
+ * attempting a direct access use Authorizable.authorize().
+ *
+ * @param authorizer authorizer
+ * @param action action
+ * @param user user
+ * @return is authorized
+ */
+ default AuthorizationResult checkAuthorization(Authorizer authorizer, RequestAction action, NiFiUser user, Map<String, String> resourceContext) {
+ final Map<String,String> userContext;
+ if (user.getClientAddress() != null && !user.getClientAddress().trim().isEmpty()) {
+ userContext = new HashMap<>();
+ userContext.put(UserContextKeys.CLIENT_ADDRESS.name(), user.getClientAddress());
+ } else {
+ userContext = null;
+ }
+
+ // build the request
+ final AuthorizationRequest request = new AuthorizationRequest.Builder()
+ .identity(user.getIdentity())
+ .anonymous(user.isAnonymous())
+ .accessAttempt(false)
+ .action(action)
+ .resource(getResource())
+ .resourceContext(resourceContext)
+ .userContext(userContext)
+ .build();
+
+ // perform the authorization
+ final AuthorizationResult result = authorizer.authorize(request);
+
+ // verify the results
+ if (Result.ResourceNotFound.equals(result.getResult())) {
+ final Authorizable parent = getParentAuthorizable();
+ if (parent == null) {
+ return AuthorizationResult.denied();
+ } else {
+ return parent.checkAuthorization(authorizer, action, user);
+ }
+ } else {
+ return result;
+ }
+ }
+
+ /**
+ * Returns the result of an authorization request for the specified user for the specified action on the specified
+ * resource. This method does not imply the user is directly attempting to access the specified resource. If the user is
+ * attempting a direct access use Authorizable.authorize().
+ *
+ * @param authorizer authorizer
+ * @param action action
+ * @param user user
+ * @return is authorized
+ */
+ default AuthorizationResult checkAuthorization(Authorizer authorizer, RequestAction action, NiFiUser user) {
+ return checkAuthorization(authorizer, action, user, null);
+ }
+
+ /**
+ * Authorizes the current user for the specified action on the specified resource. This method does imply the user is
+ * directly accessing the specified resource.
+ *
+ * @param authorizer authorizer
+ * @param action action
+ * @param user user
+ * @param resourceContext resource context
+ */
+ default void authorize(Authorizer authorizer, RequestAction action, NiFiUser user, Map<String, String> resourceContext) throws AccessDeniedException {
+ final Map<String,String> userContext;
+ if (user.getClientAddress() != null && !user.getClientAddress().trim().isEmpty()) {
+ userContext = new HashMap<>();
+ userContext.put(UserContextKeys.CLIENT_ADDRESS.name(), user.getClientAddress());
+ } else {
+ userContext = null;
+ }
+
+ final AuthorizationRequest request = new AuthorizationRequest.Builder()
+ .identity(user.getIdentity())
+ .anonymous(user.isAnonymous())
+ .accessAttempt(true)
+ .action(action)
+ .resource(getResource())
+ .resourceContext(resourceContext)
+ .userContext(userContext)
+ .build();
+
+ final AuthorizationResult result = authorizer.authorize(request);
+ if (Result.ResourceNotFound.equals(result.getResult())) {
+ final Authorizable parent = getParentAuthorizable();
+ if (parent == null) {
+ throw new AccessDeniedException("Access is denied");
+ } else {
+ parent.authorize(authorizer, action, user);
+ }
+ } else if (Result.Denied.equals(result.getResult())) {
+ throw new AccessDeniedException(result.getExplanation());
+ }
+ }
+
+ /**
+ * Authorizes the current user for the specified action on the specified resource. This method does imply the user is
+ * directly accessing the specified resource.
+ *
+ * @param authorizer authorizer
+ * @param action action
+ * @param user user
+ */
+ default void authorize(Authorizer authorizer, RequestAction action, NiFiUser user) throws AccessDeniedException {
+ authorize(authorizer, action, user, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1129706/nifi-framework-api/src/main/java/org/apache/nifi/authorization/user/NiFiUser.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/authorization/user/NiFiUser.java b/nifi-framework-api/src/main/java/org/apache/nifi/authorization/user/NiFiUser.java
new file mode 100644
index 0000000..c450bc4
--- /dev/null
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/authorization/user/NiFiUser.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.authorization.user;
+
+/**
+ * A representation of a NiFi user that has logged into the application
+ */
+public interface NiFiUser {
+
+ /**
+ * @return the unique identity of this user
+ */
+ String getIdentity();
+
+ /**
+ * @return the next user in the proxied entities chain, or <code>null</code> if no more users exist in the chain.
+ */
+ NiFiUser getChain();
+
+ /**
+ * @return <code>true</code> if the user is the unauthenticated Anonymous user
+ */
+ boolean isAnonymous();
+
+ /**
+ * @return the address of the client that made the request which created this user
+ */
+ String getClientAddress();
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1129706/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProvider.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProvider.java b/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProvider.java
new file mode 100644
index 0000000..e1e4352
--- /dev/null
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProvider.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.state;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.nifi.components.ConfigurableComponent;
+
+/**
+ * <p>
+ * Provides a mechanism by which components can store and retrieve state. Depending on the Provider, the state
+ * may be stored locally, or it may be stored on a remote resource.
+ * </p>
+ *
+ * <p>
+ * Which implementation should be used for local and clustered state is configured in the NiFi properties file.
+ * It is therefore possible to provide custom implementations of this interface. Note, however, that this interface
+ * is new as of version 0.5.0 of Apache NiFi and may not be considered "stable" as of yet. Therefore, it is subject
+ * to change without notice, so providing custom implementations is cautioned against until the API becomes more stable.
+ * </p>
+ *
+ * @since 0.5.0
+ */
+public interface StateProvider extends ConfigurableComponent {
+
+ /**
+ * Initializes the StateProvider so that it is capable of being used. This method will be called
+ * once before any of the other methods are called and will not be called again until the {@link #shutdown()}
+ * method has been called
+ *
+ * @param context the initialization context that can be used to prepare the state provider for use
+ */
+ void initialize(StateProviderInitializationContext context) throws IOException;
+
+ /**
+ * Shuts down the StateProvider and cleans up any resources held by it. Once this method has returned, the
+ * StateProvider may be initialized once again via the {@link #initialize(StateProviderInitializationContext)} method.
+ */
+ void shutdown();
+
+ /**
+ * Updates the value of the component's state, setting the new value to the
+ * given state
+ *
+ * @param state the value to change the state to
+ * @param componentId the id of the component for which state is being set
+ *
+ * @throws IOException if unable to communicate with the underlying storage mechanism
+ */
+ void setState(Map<String, String> state, String componentId) throws IOException;
+
+
+ /**
+ * Returns the currently configured state for the component. The returned StateMap will never be null.
+ * The version of the StateMap will be -1 and the state will contain no key/value pairs if the state has never been set.
+ *
+ * @param componentId the id of the component for which state is to be retrieved
+ * @return the currently configured value for the component's state
+ *
+ * @throws IOException if unable to communicate with the underlying storage mechanism
+ */
+ StateMap getState(String componentId) throws IOException;
+
+
+ /**
+ * Updates the value of the component's state to the new value if and only if the value currently
+ * is the same as the given oldValue.
+ *
+ * @param oldValue the old value to compare against
+ * @param newValue the new value to use if and only if the state's value is the same as the given oldValue
+ * @param componentId the id of the component for which state is being retrieved
+ * @return <code>true</code> if the state was updated to the new value, <code>false</code> if the state's value was not
+ * equal to oldValue
+ *
+ * @throws IOException if unable to communicate with the underlying storage mechanism
+ */
+ boolean replace(StateMap oldValue, Map<String, String> newValue, String componentId) throws IOException;
+
+ /**
+ * Removes all values from the component's state that is stored using the given scope
+ *
+ * @param componentId the id of the component for which state is being cleared
+ *
+ * @throws IOException if unable to communicate with the underlying storage mechanism
+ */
+ void clear(String componentId) throws IOException;
+
+ /**
+ * This method is called whenever a component is removed from the NiFi instance. This allows the State Provider to
+ * perform tasks when a component is removed in order to clean up resources that may be associated with that component
+ *
+ * @param componentId the ID of the component that was added to the NiFi instance
+ * @throws IOException if unable to perform the necessary cleanup
+ */
+ void onComponentRemoved(String componentId) throws IOException;
+
+ /**
+ * Notifies the state provider that it should begin servicing requests to store and retrieve state
+ */
+ void enable();
+
+ /**
+ * Notifies the state provider that it should stop servicing requests to store and retrieve state and instead throw a ProviderDisabledException if any request is made to do so
+ */
+ void disable();
+
+ /**
+ * @return <code>true</code> if the provider is enabled, <code>false</code> otherwise.
+ */
+ boolean isEnabled();
+
+ /**
+ * Provides a listing of {@link Scope}s supported by the StateProvider
+ * @return the {@link Scope}s supported by the configuration
+ */
+ Scope[] getSupportedScopes();
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1129706/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProviderInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProviderInitializationContext.java b/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProviderInitializationContext.java
new file mode 100644
index 0000000..aaf5490
--- /dev/null
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProviderInitializationContext.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.state;
+
+import java.util.Map;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+
+/**
+ * This interface defines an initialization context that is passed to a {@link StateProvider} when it
+ * is initialized.
+ */
+public interface StateProviderInitializationContext {
+ /**
+ * @return the identifier if the StateProvider
+ */
+ String getIdentifier();
+
+ /**
+ * @return a Map of Property Descriptors to their configured values
+ */
+ Map<PropertyDescriptor, PropertyValue> getProperties();
+
+ /**
+ * Returns the configured value for the given property
+ *
+ * @param property the property to retrieve the value for
+ *
+ * @return the configured value for the property.
+ */
+ PropertyValue getProperty(PropertyDescriptor property);
+
+ /**
+ * @return the SSL Context that should be used to communicate with remote resources,
+ * or <code>null</code> if no SSLContext has been configured
+ */
+ SSLContext getSSLContext();
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1129706/nifi-framework-api/src/main/java/org/apache/nifi/controller/Snippet.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/Snippet.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/Snippet.java
new file mode 100644
index 0000000..702b7cd
--- /dev/null
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/Snippet.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import org.apache.nifi.web.Revision;
+
+import java.util.Map;
+
+/**
+ * A Snippet represents a segment of the flow
+ */
+public interface Snippet {
+
+ /**
+ * @return id of this snippet
+ */
+ public String getId();
+
+ /**
+ * @return parent group id of the components in this snippet
+ */
+ public String getParentGroupId();
+
+ /**
+ * @return connections in this snippet
+ */
+ public Map<String, Revision> getConnections();
+
+ /**
+ * @return funnels in this snippet
+ */
+ public Map<String, Revision> getFunnels();
+
+ /**
+ * @return input ports in this snippet
+ */
+ public Map<String, Revision> getInputPorts();
+
+ /**
+ * @return output ports in this snippet
+ */
+ public Map<String, Revision> getOutputPorts();
+
+ /**
+ * @return labels in this snippet
+ */
+ public Map<String, Revision> getLabels();
+
+ /**
+ * @return the identifiers of all ProcessGroups in this Snippet
+ */
+ public Map<String, Revision> getProcessGroups();
+
+ /**
+ * @return the identifiers of all Processors in this Snippet
+ */
+ public Map<String, Revision> getProcessors();
+
+ /**
+ * @return the identifiers of all RemoteProcessGroups in this Snippet
+ */
+ public Map<String, Revision> getRemoteProcessGroups();
+
+ /**
+ * @return Determines if this snippet is empty
+ */
+ public boolean isEmpty();
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1129706/nifi-framework-api/src/main/java/org/apache/nifi/controller/Triggerable.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/Triggerable.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/Triggerable.java
new file mode 100644
index 0000000..4b3149b
--- /dev/null
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/Triggerable.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.ProcessException;
+
+public interface Triggerable {
+
+ public static final long MINIMUM_SCHEDULING_NANOS = 30000L;
+
+ /**
+ * <p>
+ * The method called when this processor is triggered to operate by the
+ * controller. This method may be called concurrently from different
+ * threads. When this method is called depends on how this processor is
+ * configured within a controller to be triggered (timing or event
+ * based).</p>
+ *
+ * <p>
+ * The processor may commit, roll back, or allow the framework to
+ * automatically manage the session. If the sessions are to be managed by
+ * the framework (recommended) then what it will do depends on several
+ * factors. If the method call returns due to an exception then the session
+ * will be rolled back. If the method returns normally then the session will
+ * be committed or the framework may use the session again for another
+ * processor down stream</p>
+ *
+ * @param context in which the component is triggered
+ * @param sessionFactory used to generate {@link ProcessSession}s to use for
+ * operating on flow files within the repository
+ *
+ * @throws ProcessException if processing did not complete normally though
+ * indicates the problem is an understood potential outcome of processing.
+ * The controller/caller will handle these exceptions gracefully such as
+ * logging, etc.. If another type of exception is allowed to propagate the
+ * controller may no longer trigger this processor to operate as this would
+ * indicate a probable coding defect.
+ */
+ void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException;
+
+ /**
+ * Determines the number of concurrent tasks that may be running for this
+ * <code>Triggerable</code>.
+ *
+ * @param taskCount a number of concurrent tasks this processor may have
+ * running
+ * @throws IllegalArgumentException if the given value is less than 1
+ */
+ void setMaxConcurrentTasks(int taskCount);
+
+ /**
+ * @return the number of tasks that may execute concurrently for this
+ * <code>Triggerable</code>.
+ */
+ int getMaxConcurrentTasks();
+
+ /**
+ * Indicates the {@link ScheduledState} of this <code>Triggerable</code>. A
+ * value of stopped does NOT indicate that the <code>Triggerable</code> has
+ * no active threads, only that it is not currently scheduled to be given
+ * any more threads. To determine whether or not the
+ * <code>Triggerable</code> has any active threads, see
+ * {@link ProcessScheduler#getActiveThreadCount(nifi.connectable.Connectable)}.
+ *
+ * @return the schedule state
+ */
+ ScheduledState getScheduledState();
+
+ /**
+ * Indicates whether or not this <code>Triggerable</code> is "running". It
+ * is considered "running" if it is scheduled to run OR if it is no longer
+ * scheduled to be given threads but the remaining threads from the last
+ * invocation of {@link #onTrigger(ProcessContext, ProcessSessionFactory)}
+ * have not yet returned
+ *
+ * @return true if running;false otherwise
+ */
+ boolean isRunning();
+
+ /**
+ * @param timeUnit for the scheduling period of the component
+ * @return the amount of time between each scheduling period
+ */
+ long getSchedulingPeriod(TimeUnit timeUnit);
+
+ /**
+ * @return a string representation of the time between each scheduling
+ * period
+ */
+ String getSchedulingPeriod();
+
+ /**
+ * Updates how often this Triggerable should be triggered to run
+ *
+ * @param schedulingPeriod to set
+ */
+ void setScheduldingPeriod(String schedulingPeriod);
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1129706/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
new file mode 100644
index 0000000..0f258b0
--- /dev/null
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.queue;
+
+/**
+ * Represents the state that a Drop FlowFile request is in
+ */
+public enum DropFlowFileState {
+
+ WAITING_FOR_LOCK("Waiting for destination component to complete its action"),
+ DROPPING_FLOWFILES("Dropping FlowFiles from queue"),
+ FAILURE("Failed"),
+ CANCELED("Canceled by user"),
+ COMPLETE("Completed successfully");
+
+ private final String description;
+
+ private DropFlowFileState(final String description) {
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return description;
+ }
+
+ /**
+ * @param description string form of drop flow file state
+ * @return the matching DropFlowFileState or null if the description doesn't match
+ */
+ public static DropFlowFileState valueOfDescription(String description) {
+ DropFlowFileState desiredState = null;
+
+ for (DropFlowFileState state : values()) {
+ if (state.toString().equals(description)) {
+ desiredState = state;
+ break;
+ }
+ }
+
+ return desiredState;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1129706/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
new file mode 100644
index 0000000..737fbe3
--- /dev/null
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+/**
+ * Represents the status of a Drop FlowFile Request that has been issued to
+ * a {@link FlowFileQueue}. When a queue is requested to drop its FlowFiles,
+ * that process may be rather lengthy in the case of a poorly behaving
+ * FlowFileRepository or if the destination Processor is polling from the
+ * queue using a filter that is misbehaving. As a result, the dropping of
+ * FlowFiles is performed asynchronously.
+ *
+ * This status object provides information about how far along in the process
+ * we currently are and information about the success or failure of the
+ * operation.
+ */
+public interface DropFlowFileStatus {
+
+ /**
+ * @return the identifier of the request to drop FlowFiles from the queue
+ */
+ String getRequestIdentifier();
+
+ /**
+ * @return the date/time (in milliseconds since epoch) at which the request to
+ * drop the FlowFiles from a queue was submitted
+ */
+ long getRequestSubmissionTime();
+
+ /**
+ * @return the date/time (in milliseconds since epoch) at which the status of the
+ * request was last updated
+ */
+ long getLastUpdated();
+
+ /**
+ * @return the size of the queue when the drop request was issued or <code>null</code> if
+ * it is not yet known, which can happen if the {@link DropFlowFileState} is
+ * {@link DropFlowFileState#WAITING_FOR_LOCK}.
+ */
+ QueueSize getOriginalSize();
+
+ /**
+ * @return the current size of the queue or <code>null</code> if it is not yet known
+ */
+ QueueSize getCurrentSize();
+
+ /**
+ * @return a QueueSize representing the number of FlowFiles that have been dropped for this request
+ * and the aggregate size of those FlowFiles
+ */
+ QueueSize getDroppedSize();
+
+ /**
+ * @return the current state of the operation
+ */
+ DropFlowFileState getState();
+
+ /**
+ * @return the reason that the state is set to a Failure state, or <code>null</code> if the state is not {@link DropFlowFileState#FAILURE}.
+ */
+ String getFailureReason();
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1129706/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
new file mode 100644
index 0000000..7948ecb
--- /dev/null
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.queue;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.processor.FlowFileFilter;
+
+public interface FlowFileQueue {
+
+ /**
+ * @return the unique identifier for this FlowFileQueue
+ */
+ String getIdentifier();
+
+ /**
+ * @return list of processing priorities for this queue
+ */
+ List<FlowFilePrioritizer> getPriorities();
+
+ /**
+ * Reads any Swap Files that belong to this queue and returns a summary of what is swapped out.
+ * This will be called only during NiFi startup as an initialization step. This
+ * method is then responsible for returning a FlowFileSummary of the FlowFiles that are swapped
+ * out, or <code>null</code> if no FlowFiles are swapped out for this queue.
+ *
+ * @return a SwapSummary that describes the FlowFiles that exist in the queue but are swapped out.
+ */
+ SwapSummary recoverSwappedFlowFiles();
+
+ /**
+ * Destroys any Swap Files that exist for this queue without updating the FlowFile Repository
+ * or Provenance Repository. This is done only on startup in the case of non-persistent
+ * repositories. In the case of non-persistent repositories, we may still have Swap Files because
+ * we may still need to overflow the FlowFiles from heap onto disk, even though we don't want to keep
+ * the FlowFiles on restart.
+ */
+ void purgeSwapFiles();
+
+ /**
+ * Resets the comparator used by this queue to maintain order.
+ *
+ * @param newPriorities the ordered list of prioritizers to use to determine
+ * order within this queue.
+ * @throws NullPointerException if arg is null
+ */
+ void setPriorities(List<FlowFilePrioritizer> newPriorities);
+
+ /**
+ * Establishes this queue's preferred maximum work load.
+ *
+ * @param maxQueueSize the maximum number of flow files this processor
+ * recommends having in its work queue at any one time
+ */
+ void setBackPressureObjectThreshold(long maxQueueSize);
+
+ /**
+ * @return maximum number of flow files that should be queued up at any one
+ * time
+ */
+ long getBackPressureObjectThreshold();
+
+ /**
+ * @param maxDataSize Establishes this queue's preferred maximum data size.
+ */
+ void setBackPressureDataSizeThreshold(String maxDataSize);
+
+ /**
+ * @return maximum data size that should be queued up at any one time
+ */
+ String getBackPressureDataSizeThreshold();
+
+ QueueSize size();
+
+ /**
+ * @return true if no items queue; false otherwise
+ */
+ boolean isEmpty();
+
+ /**
+ * @return <code>true</code> if the queue is empty or contains only FlowFiles that already are being processed
+ * by others, <code>false</code> if the queue contains at least one FlowFile that is available for processing,
+ * regardless of whether that FlowFile(s) is in-memory or swapped out.
+ */
+ boolean isActiveQueueEmpty();
+
+ /**
+ * Returns a QueueSize that represents all FlowFiles that are 'unacknowledged'. A FlowFile
+ * is considered to be unacknowledged if it has been pulled from the queue by some component
+ * but the session that pulled the FlowFile has not yet been committed or rolled back.
+ *
+ * @return a QueueSize that represents all FlowFiles that are 'unacknowledged'.
+ */
+ QueueSize getUnacknowledgedQueueSize();
+
+ void acknowledge(FlowFileRecord flowFile);
+
+ void acknowledge(Collection<FlowFileRecord> flowFiles);
+
+ /**
+ * @return true if maximum queue size has been reached or exceeded; false
+ * otherwise
+ */
+ boolean isFull();
+
+ /**
+ * places the given file into the queue
+ *
+ * @param file to place into queue
+ */
+ void put(FlowFileRecord file);
+
+ /**
+ * places the given files into the queue
+ *
+ * @param files to place into queue
+ */
+ void putAll(Collection<FlowFileRecord> files);
+
+ /**
+ * @param expiredRecords expired records
+ * @return the next flow file on the queue; null if empty
+ */
+ FlowFileRecord poll(Set<FlowFileRecord> expiredRecords);
+
+ /**
+ * @param maxResults limits how many results can be polled
+ * @param expiredRecords for expired records
+ * @return the next flow files on the queue up to the max results; null if
+ * empty
+ */
+ List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords);
+
+ /**
+ * Drains flow files from the given source queue into the given destination
+ * list.
+ *
+ * @param sourceQueue queue to drain from
+ * @param destination Collection to drain to
+ * @param maxResults max number to drain
+ * @param expiredRecords for expired records
+ * @return size (bytes) of flow files drained from queue
+ */
+ long drainQueue(Queue<FlowFileRecord> sourceQueue, List<FlowFileRecord> destination, int maxResults, Set<FlowFileRecord> expiredRecords);
+
+ List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords);
+
+ String getFlowFileExpiration();
+
+ int getFlowFileExpiration(TimeUnit timeUnit);
+
+ void setFlowFileExpiration(String flowExpirationPeriod);
+
+ /**
+ * Initiates a request to drop all FlowFiles in this queue. This method returns
+ * a DropFlowFileStatus that can be used to determine the current state of the request.
+ * Additionally, the DropFlowFileStatus provides a request identifier that can then be
+ * passed to the {@link #getDropFlowFileStatus(String)} and {@link #cancelDropFlowFileStatus(String)}
+ * methods in order to obtain the status later or cancel a request
+ *
+ * @param requestIdentifier the identifier of the Drop FlowFile Request
+ * @param requestor the entity that is requesting that the FlowFiles be dropped; this will be
+ * included in the Provenance Events that are generated.
+ *
+ * @return the status of the drop request.
+ */
+ DropFlowFileStatus dropFlowFiles(String requestIdentifier, String requestor);
+
+ /**
+ * Returns the current status of a Drop FlowFile Request that was initiated via the
+ * {@link #dropFlowFiles()} method that has the given identifier
+ *
+ * @param requestIdentifier the identifier of the Drop FlowFile Request
+ * @return the status for the request with the given identifier, or <code>null</code> if no
+ * request status exists with that identifier
+ */
+ DropFlowFileStatus getDropFlowFileStatus(String requestIdentifier);
+
+ /**
+ * Cancels the request to drop FlowFiles that has the given identifier. After this method is called, the request
+ * will no longer be known by this queue, so subsequent calls to {@link #getDropFlowFileStatus(String)} or
+ * {@link #cancelDropFlowFileRequest(String)} will return <code>null</code>
+ *
+ * @param requestIdentifier the identifier of the Drop FlowFile Request
+ * @return the status for the request with the given identifier after it has been canceled, or <code>null</code> if no
+ * request status exists with that identifier
+ */
+ DropFlowFileStatus cancelDropFlowFileRequest(String requestIdentifier);
+
+ /**
+ * <p>
+ * Initiates a request to obtain a listing of FlowFiles in this queue. This method returns a
+ * ListFlowFileStatus that can be used to obtain information about the FlowFiles that exist
+ * within the queue. Additionally, the ListFlowFileStatus provides a request identifier that
+ * can then be passed to the {@link #getListFlowFileStatus(String)}. The listing of FlowFiles
+ * will be returned ordered by the position of the FlowFile in the queue.
+ * </p>
+ *
+ * <p>
+ * Note that if maxResults is larger than the size of the "active queue" (i.e., the un-swapped queued,
+ * FlowFiles that are swapped out will not be returned.)
+ * </p>
+ *
+ * @param requestIdentifier the identifier of the List FlowFile Request
+ * @param maxResults the maximum number of FlowFileSummary objects to add to the ListFlowFileStatus
+ *
+ * @return the status for the request
+ *
+ * @throws IllegalStateException if either the source or the destination of the connection to which this queue belongs
+ * is currently running.
+ */
+ ListFlowFileStatus listFlowFiles(String requestIdentifier, int maxResults);
+
+ /**
+ * Returns the current status of a List FlowFile Request that was initiated via the {@link #listFlowFiles(String)}
+ * method that has the given identifier
+ *
+ * @param requestIdentifier the identifier of the Drop FlowFile Request
+ * @return the current status of the List FlowFile Request with the given identifier or <code>null</code> if no
+ * request status exists with that identifier
+ */
+ ListFlowFileStatus getListFlowFileStatus(String requestIdentifier);
+
+ /**
+ * Cancels the request to list FlowFiles that has the given identifier. After this method is called, the request
+ * will no longer be known by this queue, so subsequent calls to {@link #getListFlowFileStatus(String)} or
+ * {@link #cancelListFlowFileRequest(String)} will return <code>null</code>
+ *
+ * @param requestIdentifier the identifier of the Drop FlowFile Request
+ * @return the current status of the List FlowFile Request with the given identifier or <code>null</code> if no
+ * request status exists with that identifier
+ */
+ ListFlowFileStatus cancelListFlowFileRequest(String requestIdentifier);
+
+ /**
+ * Returns the FlowFile with the given UUID or <code>null</code> if no FlowFile can be found in this queue
+ * with the given UUID
+ *
+ * @param flowFileUuid the UUID of the FlowFile to retrieve
+ * @return the FlowFile with the given UUID or <code>null</code> if no FlowFile can be found in this queue
+ * with the given UUID
+ *
+ * @throws IOException if unable to read FlowFiles that are stored on some external device
+ */
+ FlowFileRecord getFlowFile(String flowFileUuid) throws IOException;
+
+ /**
+ * Ensures that a listing can be performed on the queue
+ *
+ * @throws IllegalStateException if the queue is not in a state in which a listing can be performed
+ */
+ void verifyCanList() throws IllegalStateException;
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1129706/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java
new file mode 100644
index 0000000..b7207f2
--- /dev/null
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+/**
+ * A summary of a FlowFile that can be used to represent a "high level" view of FlowFile
+ * without providing all of the information available.
+ */
+public interface FlowFileSummary {
+ /**
+ * @return the UUID of the FlowFile
+ */
+ String getUuid();
+
+ /**
+ * @return the value of the 'filename' attribute
+ */
+ String getFilename();
+
+ /**
+ * @return the current position of the FlowFile in the queue based on the prioritizers selected
+ */
+ int getPosition();
+
+ /**
+ * @return the size of the FlowFile in bytes
+ */
+ long getSize();
+
+ /**
+ * @return the timestamp (in milliseconds since epoch) at which the FlowFile was added to the queue
+ */
+ long getLastQueuedTime();
+
+ /**
+ * @return the timestamp (in milliseconds since epoch) at which the FlowFile's greatest ancestor entered the flow
+ */
+ long getLineageStartDate();
+
+ /**
+ * @return <code>true</code> if the FlowFile is penalized, <code>false</code> otherwise
+ */
+ boolean isPenalized();
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1129706/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileState.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileState.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileState.java
new file mode 100644
index 0000000..eb417aa
--- /dev/null
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileState.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+/**
+ * Represents the state that a List FlowFile Request is in
+ */
+public enum ListFlowFileState {
+ WAITING_FOR_LOCK("Waiting for other queue requests to complete"),
+ CALCULATING_LIST("Calculating list of FlowFiles"),
+ FAILURE("Failed"),
+ CANCELED("Canceled by user"),
+ COMPLETE("Completed successfully");
+
+ private final String description;
+
+ private ListFlowFileState(final String description) {
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return description;
+ }
+
+ /**
+ * @param description string form of list flow file state
+ * @return the matching ListFlowFileState or <code>null</code> if the description doesn't match
+ */
+ public static ListFlowFileState valueOfDescription(String description) {
+ ListFlowFileState desiredState = null;
+
+ for (ListFlowFileState state : values()) {
+ if (state.toString().equals(description)) {
+ desiredState = state;
+ break;
+ }
+ }
+
+ return desiredState;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1129706/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
new file mode 100644
index 0000000..e3cc337
--- /dev/null
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+import java.util.List;
+
+public interface ListFlowFileStatus {
+
+ /**
+ * @return the maximum number of FlowFile Summary objects that should be returned
+ */
+ int getMaxResults();
+
+ /**
+ * @return the identifier of the request to drop FlowFiles from the queue
+ */
+ String getRequestIdentifier();
+
+ /**
+ * @return the date/time (in milliseconds since epoch) at which the request to
+ * drop the FlowFiles from a queue was submitted
+ */
+ long getRequestSubmissionTime();
+
+ /**
+ * @return the date/time (in milliseconds since epoch) at which the status of the
+ * request was last updated
+ */
+ long getLastUpdated();
+
+ /**
+ * @return the current state of the operation
+ */
+ ListFlowFileState getState();
+
+ /**
+ * @return the reason that the state is set to a Failure state, or <code>null</code> if the state is not {@link ListFlowFileStatus#FAILURE}.
+ */
+ String getFailureReason();
+
+ /**
+ * @return the current size of the queue
+ */
+ QueueSize getQueueSize();
+
+ /**
+ * @return a List of FlowFileSummary objects
+ */
+ List<FlowFileSummary> getFlowFileSummaries();
+
+ /**
+ * @return the percentage (an integer between 0 and 100, inclusive) of how close the request is to being completed
+ */
+ int getCompletionPercentage();
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1129706/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java
new file mode 100644
index 0000000..30d285c
--- /dev/null
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+import java.util.Comparator;
+
+/**
+ * Specifies which column to sort on when performing a Listing of FlowFiles via
+ * {@link FlowFileQueue#listFlowFiles(String, SortColumn, SortDirection)}
+ */
+public enum SortColumn implements Comparator<FlowFileSummary> {
+ /**
+ * Sort based on the current position in the queue
+ */
+ QUEUE_POSITION (new Comparator<FlowFileSummary>() {
+ @Override
+ public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+ return Integer.compare(o1.getPosition(), o2.getPosition());
+ }
+ }),
+
+ /**
+ * Sort based on the UUID of the FlowFile
+ */
+ FLOWFILE_UUID (new Comparator<FlowFileSummary>() {
+ @Override
+ public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+ return o1.getUuid().compareTo(o2.getUuid());
+ }
+ }),
+
+ /**
+ * Sort based on the 'filename' attribute of the FlowFile
+ */
+ FILENAME (new Comparator<FlowFileSummary>() {
+ @Override
+ public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+ return o1.getFilename().compareTo(o2.getFilename());
+ }
+ }),
+
+ /**
+ * Sort based on the size of the FlowFile
+ */
+ FLOWFILE_SIZE(new Comparator<FlowFileSummary>() {
+ @Override
+ public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+ return Long.compare(o1.getSize(), o2.getSize());
+ }
+ }),
+
+ /**
+ * Sort based on how long the FlowFile has been sitting in the queue
+ */
+ QUEUED_DURATION (new Comparator<FlowFileSummary>() {
+ @Override
+ public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+ return -Long.compare(o1.getLastQueuedTime(), o2.getLastQueuedTime());
+ }
+ }),
+
+ /**
+ * Sort based on the age of the FlowFile. I.e., the time at which the FlowFile's
+ * "greatest ancestor" entered the flow
+ */
+ FLOWFILE_AGE (new Comparator<FlowFileSummary>() {
+ @Override
+ public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+ return Long.compare(o1.getLineageStartDate(), o2.getLineageStartDate());
+ }
+ }),
+
+ /**
+ * Sort based on when the FlowFile's penalization ends
+ */
+ PENALIZATION (new Comparator<FlowFileSummary>() {
+ @Override
+ public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+ return Boolean.compare(o1.isPenalized(), o2.isPenalized());
+ }
+ });
+
+ private final Comparator<FlowFileSummary> comparator;
+
+ private SortColumn(final Comparator<FlowFileSummary> comparator) {
+ this.comparator = comparator;
+ }
+
+ @Override
+ public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+ return comparator.compare(o1, o2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1129706/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/SortDirection.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/SortDirection.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/SortDirection.java
new file mode 100644
index 0000000..129e748
--- /dev/null
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/SortDirection.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+/**
+ * Specifies the order in which FlowFiles should be sorted when performing a listing of
+ * FlowFiles via the {@link FlowFileQueue#listFlowFiles(String, SortColumn, SortDirection)}
+ * method
+ */
+public enum SortDirection {
+ /**
+ * FlowFiles should be sorted such that the FlowFile with the lowest value for the Sort Column
+ * should occur first in the listing.
+ */
+ ASCENDING,
+
+ /**
+ * FlowFiles should be sorted such that the FlowFile with the largest value for the Sort Column
+ * should occur first in the listing.
+ */
+ DESCENDING;
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1129706/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
new file mode 100644
index 0000000..b1ea87c
--- /dev/null
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Set;
+
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+
+/**
+ * Defines the capabilities of a content repository. Append options are not
+ * available on the methods but a merge capability is provided which between
+ * that and creating new claims a merge is available.
+ *
+ */
+public interface ContentRepository {
+
+ /**
+ * Initializes the Content Repository, providing to it the
+ * ContentClaimManager that is to be used for interacting with Content
+ * Claims
+ *
+ * @param claimManager to handle claims
+ * @throws java.io.IOException if unable to init
+ */
+ void initialize(ResourceClaimManager claimManager) throws IOException;
+
+ /**
+ * Shuts down the Content Repository, freeing any resources that may be
+ * held. This is called when an administrator shuts down NiFi.
+ */
+ void shutdown();
+
+ /**
+ * @return the names of all Containers that exist for this Content
+ * Repository
+ */
+ Set<String> getContainerNames();
+
+ /**
+ * @param containerName name of container to check capacity on
+ * @return the maximum number of bytes that can be stored in the storage
+ * mechanism that backs the container with the given name
+ * @throws java.io.IOException if unable to check capacity
+ * @throws IllegalArgumentException if no container exists with the given
+ * name
+ */
+ long getContainerCapacity(String containerName) throws IOException;
+
+ /**
+ * @param containerName to check space on
+ * @return the number of bytes available to be used used by the storage
+ * mechanism that backs the container with the given name
+ * @throws java.io.IOException if unable to check space
+ * @throws IllegalArgumentException if no container exists with the given
+ * name
+ */
+ long getContainerUsableSpace(String containerName) throws IOException;
+
+ /**
+ * Creates a new content claim
+ *
+ * @param lossTolerant indicates whether the content for the new claim is
+ * loss tolerant. If true the repository might choose more volatile storage
+ * options which could increase performance for a tradeoff with reliability
+ * @return newly created claim
+ * @throws java.io.IOException if unable to create claim
+ */
+ ContentClaim create(boolean lossTolerant) throws IOException;
+
+ /**
+ * Increments the number of claimants for the given claim
+ *
+ * @param claim to increment
+ * @return the number of claimants after incrementing
+ */
+ int incrementClaimaintCount(ContentClaim claim);
+
+ /**
+ * Obtains the current number of claimants for the given claim
+ *
+ * @param claim to get count of
+ * @return the number of claimants
+ */
+ int getClaimantCount(ContentClaim claim);
+
+ /**
+ * Reduces the number of claimants for the given claim. Even if the given
+ * claim is null or content cannot be found or removed no exception will be
+ * thrown.
+ *
+ * @param claim to decrement
+ * @return new claimant count for the given claim
+ */
+ int decrementClaimantCount(ContentClaim claim);
+
+ /**
+ * Removes the content indicated by the given claim
+ *
+ * @param claim to remove
+ *
+ * @return a boolean indicating whether or not the destruction of the claim
+ * was successful
+ */
+ boolean remove(ContentClaim claim);
+
+ /**
+ * Clones the content for the given content claim and returns content claim
+ * of the new object
+ *
+ * @param original to clone
+ * @param lossTolerant if can be place in a loss tolerant repository
+ * @return new claim
+ * @throws IOException if an IO error occurs. Any content written to the new
+ * destination prior to the error will be destroyed
+ */
+ ContentClaim clone(ContentClaim original, boolean lossTolerant) throws IOException;
+
+ /**
+ * Creates a new content item that is the merger in iteration order of all
+ * content for the given claims
+ *
+ * @return the size of the destination
+ * @param claims the claims to merge which will be combined in order of
+ * collection iteration
+ * @param destination the claim to write the merged content to
+ * @param header if supplied will be prepended to the output
+ * @param footer if supplied will be appended to the output
+ * @param demarcator if supplied will be placed in between each merged
+ * object
+ * @throws IOException if unable to merge
+ * @throws IllegalArgumentException if the given destination is included in
+ * the given claims
+ */
+ long merge(Collection<ContentClaim> claims, ContentClaim destination, byte[] header, byte[] footer, byte[] demarcator) throws IOException;
+
+ /**
+ * Imports content from the given path creating a new content object and
+ * claim within the repository.
+ *
+ * @return the size of the claim
+ * @param content to import from
+ * @param claim the claim to write imported content to
+ * @throws IOException if failure to read given content
+ */
+ long importFrom(Path content, ContentClaim claim) throws IOException;
+
+ /**
+ * Imports content from the given stream creating a new content object and
+ * claim within the repository.
+ *
+ * @return the size of the claim
+ * @param content to import from
+ * @param claim the claim to write imported content to
+ * @throws IOException if unable to read content
+ */
+ long importFrom(InputStream content, ContentClaim claim) throws IOException;
+
+ /**
+ * Exports the content of the given claim to the given destination.
+ *
+ * @return the size of the destination or the claim
+ * @param claim to export from
+ * @param destination where to export data
+ * @param append if true appends to the destination; false overwrites
+ * @throws IOException if an IO error occurs. The state of the content for
+ * the given destination is unknown and callers should consider whether they
+ * should clean up any partially created paths
+ */
+ long exportTo(ContentClaim claim, Path destination, boolean append) throws IOException;
+
+ /**
+ * Exports the content of the given claim to the given destination.
+ *
+ * @return the size of the destination or the claim
+ * @param claim to export from
+ * @param destination where to export data
+ * @param append if true appends to the destination; false overwrites
+ * @param offset the offset at which the claim should start being copied
+ * @param length the number of bytes to copy
+ * @throws IOException if an IO error occurs. The state of the content for
+ * the given destination is unknown and callers should consider whether they
+ * should clean up any partially created paths
+ */
+ long exportTo(ContentClaim claim, Path destination, boolean append, long offset, long length) throws IOException;
+
+ /**
+ * Exports the content of the given claim to the given destination.
+ *
+ * @return the size of the claim
+ * @param claim to export from
+ * @param destination where to export data
+ * @throws IOException if an IO error occurs.
+ */
+ long exportTo(ContentClaim claim, OutputStream destination) throws IOException;
+
+ /**
+ * Exports a subset of the content of the given claim, starting at offset
+ * and copying length bytes, to the given destination.
+ *
+ * @return the number of bytes copied
+ * @param claim to export from
+ * @param destination where to export data
+ * @param offset the offset into the claim at which the copy should begin
+ * @param length the number of bytes to copy
+ * @throws IOException if an IO error occurs.
+ */
+ long exportTo(ContentClaim claim, OutputStream destination, long offset, long length) throws IOException;
+
+ /**
+ * @param claim to get size of
+ * @return size in bytes of content for given claim
+ * @throws IOException if size check failed
+ */
+ long size(ContentClaim claim) throws IOException;
+
+ /**
+ * Provides access to the input stream for the given claim
+ *
+ * @param claim to read from
+ * @return InputStream over the content of the given claim
+ * @throws IOException if unable to read
+ */
+ InputStream read(ContentClaim claim) throws IOException;
+
+ /**
+ * Obtains an OutputStream to the content for the given claim.
+ *
+ * @param claim to write to
+ * @return the stream to write to
+ * @throws IOException if unable to obtain stream
+ */
+ OutputStream write(ContentClaim claim) throws IOException;
+
+ /**
+ * Purges the contents of the repository, as if the repository were newly
+ * created.
+ */
+ void purge();
+
+ /**
+ * Performs any cleanup actions that may need to be taken upon system
+ * restart. For example, if content was partially written to the repository
+ * before the restart, the repository is given a chance to handle this data
+ */
+ void cleanup();
+
+ /**
+ * @param contentClaim the Content Claim to check
+ * @return Returns a boolean indicating whether or not the content specified
+ * by the given claim can be read, regardless of whether the content has
+ * been archived or not. If the ContentRepository does not implement
+ * archiving capabilities, this method will return <code>false</code>
+ *
+ * @throws IOException if unable to determine accessibility
+ */
+ boolean isAccessible(ContentClaim contentClaim) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1129706/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRecord.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRecord.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRecord.java
new file mode 100644
index 0000000..b358cd9
--- /dev/null
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRecord.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository;
+
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.flowfile.FlowFile;
+
+/**
+ * <code>FlowFileRecord</code> is a sub-interface of <code>FlowFile</code> and
+ * is used to provide additional information about FlowFiles that provide
+ * valuable information to the framework but should be hidden from components
+ */
+public interface FlowFileRecord extends FlowFile {
+
+ /**
+ * @return the time (in millis since epoch) at which this FlowFile should no
+ * longer be penalized
+ */
+ long getPenaltyExpirationMillis();
+
+ /**
+ * @return the {@link ContentClaim} that holds the FlowFile's content
+ */
+ ContentClaim getContentClaim();
+
+ /**
+ * @return the byte offset into the {@link ContentClaim} at which the
+ * FlowFile's content occurs. This mechanism allows multiple FlowFiles to
+ * have the same ContentClaim, which can be significantly more efficient for
+ * some implementations of
+ * {@link org.apache.nifi.controller.repository.ContentRepository ContentRepository}
+ */
+ long getContentClaimOffset();
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1129706/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
new file mode 100644
index 0000000..906cbe2
--- /dev/null
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+
+/**
+ * Implementations must be thread safe
+ *
+ */
+public interface FlowFileRepository extends Closeable {
+
+ /**
+ * Initializes the Content Repository, providing to it the
+ * ContentClaimManager that is to be used for interacting with Content
+ * Claims
+ *
+ * @param claimManager for handling claims
+ * @throws java.io.IOException if unable to initialize repository
+ */
+ void initialize(ResourceClaimManager claimManager) throws IOException;
+
+ /**
+ * @return the maximum number of bytes that can be stored in the underlying
+ * storage mechanism
+ *
+ * @throws IOException if computing capacity fails
+ */
+ long getStorageCapacity() throws IOException;
+
+ /**
+ * @return the number of bytes currently available for use by the underlying
+ * storage mechanism
+ *
+ * @throws IOException if computing usable space fails
+ */
+ long getUsableStorageSpace() throws IOException;
+
+ /**
+ * Updates the repository with the given RepositoryRecords.
+ *
+ * @param records the records to update the repository with
+ * @throws java.io.IOException if update fails
+ */
+ void updateRepository(Collection<RepositoryRecord> records) throws IOException;
+
+ /**
+ * Loads all flow files found within the repository, establishes the content
+ * claims and their reference count
+ *
+ * @param queueProvider the provider of FlowFile Queues into which the
+ * FlowFiles should be enqueued
+ * @param minimumSequenceNumber specifies the minimum value that should be
+ * returned by a call to {@link #getNextFlowFileSequence()}
+ *
+ * @return index of highest flow file identifier
+ * @throws IOException if load fails
+ */
+ long loadFlowFiles(QueueProvider queueProvider, long minimumSequenceNumber) throws IOException;
+
+ /**
+ * @return <code>true</code> if the Repository is volatile (i.e., its data
+ * is lost upon application restart), <code>false</code> otherwise
+ */
+ boolean isVolatile();
+
+ /**
+ * @return the next ID in sequence for creating <code>FlowFile</code>s.
+ */
+ long getNextFlowFileSequence();
+
+ /**
+ * @return the max ID of all <code>FlowFile</code>s that currently exist in
+ * the repository.
+ * @throws IOException if computing max identifier fails
+ */
+ long getMaxFlowFileIdentifier() throws IOException;
+
+ /**
+ * Updates the Repository to indicate that the given FlowFileRecords were
+ * Swapped Out of memory
+ *
+ * @param swappedOut the FlowFiles that were swapped out of memory
+ * @param flowFileQueue the queue that the FlowFiles belong to
+ * @param swapLocation the location to which the FlowFiles were swapped
+ *
+ * @throws IOException if swap fails
+ */
+ void swapFlowFilesOut(List<FlowFileRecord> swappedOut, FlowFileQueue flowFileQueue, String swapLocation) throws IOException;
+
+ /**
+ * Updates the Repository to indicate that the given FlowFileRecpords were
+ * Swapped In to memory
+ *
+ * @param swapLocation the location (e.g., a filename) from which FlowFiles
+ * were recovered
+ * @param flowFileRecords the records that were swapped in
+ * @param flowFileQueue the queue that the FlowFiles belong to
+ *
+ * @throws IOException if swap fails
+ */
+ void swapFlowFilesIn(String swapLocation, List<FlowFileRecord> flowFileRecords, FlowFileQueue flowFileQueue) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1129706/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
new file mode 100644
index 0000000..7092a6f
--- /dev/null
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+
+/**
+ * Defines a mechanism by which FlowFiles can be move into external storage or
+ * memory so that they can be removed from the Java heap and vice-versa
+ */
+// TODO: This needs to be refactored into two different mechanisms, one that is responsible for doing
+// framework-y types of things, such as updating the repositories, and another that is responsible
+// for serializing and deserializing FlowFiles to external storage.
+public interface FlowFileSwapManager {
+
+ /**
+ * Initializes the Swap Manager, providing a {@link SwapManagerInitializationContext} so that the
+ * Swap Manager has access to all of the components necessary to perform its functions
+ *
+ * @param initializationContext the context the provides the swap manager with access to the
+ * resources that it needs to perform its functions
+ */
+ void initialize(SwapManagerInitializationContext initializationContext);
+
+ /**
+ * Swaps out the given FlowFiles that belong to the queue with the given identifier.
+ *
+ * @param flowFiles the FlowFiles to swap out to external storage
+ * @param flowFileQueue the queue that the FlowFiles belong to
+ * @return the location of the externally stored swap file
+ *
+ * @throws IOException if unable to swap the FlowFiles out
+ */
+ String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue) throws IOException;
+
+ /**
+ * Recovers the FlowFiles from the swap file that lives at the given location. This action
+ * provides a view of the FlowFiles but does not actively swap them in, meaning that the swap file
+ * at the given location remains in that location and the FlowFile Repository is not updated.
+ *
+ * @param swapLocation the location of the swap file
+ * @param flowFileQueue the queue that the FlowFiles belong to
+ * @return a SwapContents that includes the FlowFiles that live at the given swap location
+ *
+ * @throws IOException if unable to recover the FlowFiles from the given location
+ */
+ SwapContents peek(String swapLocation, FlowFileQueue flowFileQueue) throws IncompleteSwapFileException, IOException;
+
+ /**
+ * Recovers the FlowFiles from the swap file that lives at the given location and belongs
+ * to the FlowFile Queue with the given identifier. The FlowFile Repository is then updated
+ * and the swap file is permanently removed from the external storage
+ *
+ * @param swapLocation the location of the swap file
+ * @param flowFileQueue the queue to which the FlowFiles belong
+ *
+ * @return a SwapContents that includes FlowFiles that are stored in the given location
+ *
+ * @throws IOException if unable to recover the FlowFiles from the given location or update the
+ * FlowFileRepository
+ */
+ SwapContents swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IncompleteSwapFileException, IOException;
+
+ /**
+ * Determines swap files that exist for the given FlowFileQueue
+ *
+ * @param flowFileQueue the queue for which the FlowFiles should be recovered
+ *
+ * @return all swap locations that have been identified for the given queue, in the order that they should
+ * be swapped back in
+ */
+ List<String> recoverSwapLocations(FlowFileQueue flowFileQueue) throws IOException;
+
+ /**
+ * Parses the contents of the swap file at the given location and provides a SwapSummary that provides
+ * pertinent information about the information stored within the swap file
+ *
+ * @param swapLocation the location of the swap file
+ * @return a SwapSummary that provides information about what is contained within the swap file
+ * @throws IOException if unable to read or parse the swap file
+ */
+ SwapSummary getSwapSummary(String swapLocation) throws IOException;
+
+ /**
+ * Purge all known Swap Files without updating FlowFileRepository or Provenance Repository
+ */
+ void purge();
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1129706/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/IncompleteSwapFileException.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/IncompleteSwapFileException.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/IncompleteSwapFileException.java
new file mode 100644
index 0000000..4408f02
--- /dev/null
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/IncompleteSwapFileException.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import java.io.EOFException;
+
+/**
+ * Signals that a Swap File could not be complete read in/parsed because the data was
+ * not all present
+ */
+public class IncompleteSwapFileException extends EOFException {
+ private static final long serialVersionUID = -6818558584430076898L;
+
+ private final String swapLocation;
+ private final SwapContents partialContents;
+
+ public IncompleteSwapFileException(final String swapLocation, final SwapContents partialContents) {
+ super();
+ this.swapLocation = swapLocation;
+ this.partialContents = partialContents;
+ }
+
+ public String getSwapLocation() {
+ return swapLocation;
+ }
+
+ public SwapContents getPartialContents() {
+ return partialContents;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1129706/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/QueueProvider.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/QueueProvider.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/QueueProvider.java
new file mode 100644
index 0000000..95d9f2e
--- /dev/null
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/QueueProvider.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository;
+
+import java.util.Collection;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+
+/**
+ * Provides a collection of <code>FlowFileQueue</code>s that represents all
+ * queues in the current flow
+ */
+public interface QueueProvider {
+
+ /**
+ * @return all <code>FlowFileQueue</code>s that currently exist in the flow
+ */
+ Collection<FlowFileQueue> getAllQueues();
+}