You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/01/20 15:01:49 UTC
[13/42] incubator-nifi git commit: Merge branch 'develop' of
https://git-wip-us.apache.org/repos/asf/incubator-nifi into develop
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
----------------------------------------------------------------------
diff --cc nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
index 0000000,31e5105..da80546
mode 000000,100644..100644
--- a/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
+++ b/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
@@@ -1,0 -1,293 +1,293 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.processors.standard;
+
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.stream.io.BufferedInputStream;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.Relationship;
+ import org.apache.nifi.processor.exception.FlowFileAccessException;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processors.standard.util.FileInfo;
+ import org.apache.nifi.processors.standard.util.FileTransfer;
+ import org.apache.nifi.processors.standard.util.SFTPTransfer;
+ import org.apache.nifi.util.ObjectHolder;
+ import org.apache.nifi.util.StopWatch;
+
+ import java.io.File;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.Set;
+ import java.util.concurrent.TimeUnit;
+
+ /**
+ * Base class for PutFTP & PutSFTP
+ * @param <T>
+ */
+ public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractProcessor {
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that are successfully sent will be routed to success").build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles that failed to send to the remote system; failure is usually looped back to this processor").build();
+ public static final Relationship REL_REJECT = new Relationship.Builder().name("reject").description("FlowFiles that were rejected by the destination system").build();
+
+ private final Set<Relationship> relationships;
+
+ public PutFileTransfer() {
+ super();
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ relationships.add(REL_REJECT);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ protected abstract T getFileTransfer(final ProcessContext context);
+
+ protected void beforePut(final FlowFile flowFile, final ProcessContext context, final T transfer) throws IOException {
+
+ }
+
+ protected void afterPut(final FlowFile flowFile, final ProcessContext context, final T transfer) throws IOException {
+
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final ProcessorLog logger = getLogger();
+ final String hostname = context.getProperty(FileTransfer.HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
+
+ final int maxNumberOfFiles = context.getProperty(FileTransfer.BATCH_SIZE).asInteger();
+ int fileCount = 0;
+ try (final T transfer = getFileTransfer(context)) {
+ do {
+ final String rootPath = context.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions(flowFile).getValue();
+ final String workingDirPath;
+ if (rootPath == null) {
+ workingDirPath = null;
+ } else {
+ File workingDirectory = new File(rootPath);
+ if (!workingDirectory.getPath().startsWith("/") && !workingDirectory.getPath().startsWith("\\")) {
+ workingDirectory = new File(transfer.getHomeDirectory(flowFile), workingDirectory.getPath());
+ }
+ workingDirPath = workingDirectory.getPath().replace("\\", "/");
+ }
+
+ final boolean rejectZeroByteFiles = context.getProperty(FileTransfer.REJECT_ZERO_BYTE).asBoolean();
+ final ConflictResult conflictResult = identifyAndResolveConflictFile(context.getProperty(FileTransfer.CONFLICT_RESOLUTION).getValue(),
+ transfer, workingDirPath, flowFile, rejectZeroByteFiles, logger);
+
+ if (conflictResult.isTransfer()) {
+ final StopWatch stopWatch = new StopWatch();
+ stopWatch.start();
+
+ beforePut(flowFile, context, transfer);
+ final FlowFile flowFileToTransfer = flowFile;
+ final ObjectHolder<String> fullPathRef = new ObjectHolder<>(null);
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws IOException {
+ try (final InputStream bufferedIn = new BufferedInputStream(in)) {
+ if (workingDirPath != null && context.getProperty(SFTPTransfer.CREATE_DIRECTORY).asBoolean()) {
+ transfer.ensureDirectoryExists(flowFileToTransfer, new File(workingDirPath));
+ }
+
+ fullPathRef.set(transfer.put(flowFileToTransfer, workingDirPath, conflictResult.getFileName(), bufferedIn));
+ }
+ }
+ });
+ afterPut(flowFile, context, transfer);
+
+ stopWatch.stop();
+ final String dataRate = stopWatch.calculateDataRate(flowFile.getSize());
+ final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+ logger.info("Successfully transfered {} to {} on remote host {} in {} milliseconds at a rate of {}",
+ new Object[]{flowFile, fullPathRef.get(), hostname, millis, dataRate});
+
+ String fullPathWithSlash = fullPathRef.get();
+ if (!fullPathWithSlash.startsWith("/")) {
+ fullPathWithSlash = "/" + fullPathWithSlash;
+ }
+ final String destinationUri = transfer.getProtocolName() + "://" + hostname + fullPathWithSlash;
+ session.getProvenanceReporter().send(flowFile, destinationUri, millis);
+ }
+
+ if (conflictResult.isPenalize()) {
+ flowFile = session.penalize(flowFile);
+ }
+
+ session.transfer(flowFile, conflictResult.getRelationship());
+ session.commit();
- } while (isScheduled() && (getRelationships().size() == session.getAvailableRelationships().size()) && (++fileCount < maxNumberOfFiles) && ((flowFile = session.get()) != null));
++ } while (isScheduled() && (getRelationships().size() == context.getAvailableRelationships().size()) && (++fileCount < maxNumberOfFiles) && ((flowFile = session.get()) != null));
+ } catch (final IOException e) {
+ context.yield();
+ logger.error("Unable to transfer {} to remote host {} due to {}", new Object[]{flowFile, hostname, e});
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ } catch (final FlowFileAccessException e) {
+ context.yield();
+ logger.error("Unable to transfer {} to remote host {} due to {}", new Object[]{flowFile, hostname, e.getCause()});
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ } catch (final ProcessException e) {
+ context.yield();
+ logger.error("Unable to transfer {} to remote host {} due to {}: {}; routing to failure", new Object[]{flowFile, hostname, e, e.getCause()});
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+
+ //Attempts to identify naming or content issues with files before they are transferred.
+ private ConflictResult identifyAndResolveConflictFile(final String conflictResolutionType, final T transfer, final String path, final FlowFile flowFile, final boolean rejectZeroByteFiles, final ProcessorLog logger) throws IOException {
+ Relationship destinationRelationship = REL_SUCCESS;
+ String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key());
+ boolean transferFile = true;
+ boolean penalizeFile = false;
+
+ //First, check if the file is empty
+ //Reject files that are zero bytes or less
+ if (rejectZeroByteFiles) {
+ final long sizeInBytes = flowFile.getSize();
+ if (sizeInBytes == 0) {
+ logger.warn("Rejecting {} because it is zero bytes", new Object[]{flowFile});
+ return new ConflictResult(REL_REJECT, false, fileName, true);
+ }
+ }
+
+ //Second, check if the user doesn't care about detecting naming conflicts ahead of time
+ if (conflictResolutionType.equalsIgnoreCase(FileTransfer.CONFLICT_RESOLUTION_NONE)) {
+ return new ConflictResult(destinationRelationship, transferFile, fileName, penalizeFile);
+ }
+
+ final FileInfo remoteFileInfo = transfer.getRemoteFileInfo(flowFile, path, fileName);
+ if (remoteFileInfo == null) {
+ return new ConflictResult(destinationRelationship, transferFile, fileName, penalizeFile);
+ }
+
+ if (remoteFileInfo.isDirectory()) {
+ logger.info("Resolving conflict by rejecting {} due to conflicting filename with a directory or file already on remote server", new Object[]{flowFile});
+ return new ConflictResult(REL_REJECT, false, fileName, false);
+ }
+
+ logger.info("Discovered a filename conflict on the remote server for {} so handling using configured Conflict Resolution of {}",
+ new Object[]{flowFile, conflictResolutionType});
+
+ switch (conflictResolutionType.toUpperCase()) {
+ case FileTransfer.CONFLICT_RESOLUTION_REJECT:
+ destinationRelationship = REL_REJECT;
+ transferFile = false;
+ penalizeFile = false;
+ logger.info("Resolving conflict by rejecting {} due to conflicting filename with a directory or file already on remote server", new Object[]{flowFile});
+ break;
+ case FileTransfer.CONFLICT_RESOLUTION_REPLACE:
+ transfer.deleteFile(path, fileName);
+ destinationRelationship = REL_SUCCESS;
+ transferFile = true;
+ penalizeFile = false;
+ logger.info("Resolving filename conflict for {} with remote server by deleting remote file and replacing with flow file", new Object[]{flowFile});
+ break;
+ case FileTransfer.CONFLICT_RESOLUTION_RENAME:
+ boolean uniqueNameGenerated = false;
+ for (int i = 1; i < 100 && !uniqueNameGenerated; i++) {
+ String possibleFileName = i + "." + fileName;
+
+ final FileInfo renamedFileInfo = transfer.getRemoteFileInfo(flowFile, path, possibleFileName);
+ uniqueNameGenerated = (renamedFileInfo == null);
+ if (uniqueNameGenerated) {
+ fileName = possibleFileName;
+ logger.info("Attempting to resolve filename conflict for {} on the remote server by using a newly generated filename of: {}", new Object[]{flowFile, fileName});
+ destinationRelationship = REL_SUCCESS;
+ transferFile = true;
+ penalizeFile = false;
+ break;
+ }
+ }
+ if (!uniqueNameGenerated) {
+ destinationRelationship = REL_REJECT;
+ transferFile = false;
+ penalizeFile = false;
+ logger.info("Could not determine a unique name after 99 attempts for. Switching resolution mode to REJECT for " + flowFile);
+ }
+ break;
+ case FileTransfer.CONFLICT_RESOLUTION_IGNORE:
+ destinationRelationship = REL_SUCCESS;
+ transferFile = false;
+ penalizeFile = false;
+ logger.info("Resolving conflict for {} by not transferring file and and still considering the process a success.", new Object[]{flowFile});
+ break;
+ case FileTransfer.CONFLICT_RESOLUTION_FAIL:
+ destinationRelationship = REL_FAILURE;
+ transferFile = false;
+ penalizeFile = true;
+ logger.info("Resolved filename conflict for {} as configured by routing to FAILURE relationship.", new Object[]{flowFile});
+ default:
+ break;
+ }
+
+ return new ConflictResult(destinationRelationship, transferFile, fileName, penalizeFile);
+ }
+
+ /**
+ * static inner class to hold conflict data
+ */
+ private static class ConflictResult {
+
+ final Relationship relationship;
+ final boolean transferFile;
+ final String newFileName;
+ final boolean penalizeFile;
+
+ public ConflictResult(final Relationship relationship, final boolean transferFileVal, final String newFileNameVal, final boolean penalizeFileVal) {
+ this.relationship = relationship;
+ this.transferFile = transferFileVal;
+ this.newFileName = newFileNameVal;
+ this.penalizeFile = penalizeFileVal;
+ }
+
+ public boolean isTransfer() {
+ return transferFile;
+ }
+
+ public boolean isPenalize() {
+ return penalizeFile;
+ }
+
+ public String getFileName() {
+ return newFileName;
+ }
+
+ public Relationship getRelationship() {
+ return relationship;
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
----------------------------------------------------------------------
diff --cc nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
index 0000000,cae61f0..1cf5f1f
mode 000000,100644..100644
--- a/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
+++ b/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
@@@ -1,0 -1,320 +1,325 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.processors.standard.servlets;
+
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.security.cert.X509Certificate;
+ import java.util.Enumeration;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.UUID;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicLong;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.regex.Pattern;
+ import java.util.zip.GZIPInputStream;
+
+ import javax.servlet.ServletConfig;
+ import javax.servlet.ServletContext;
+ import javax.servlet.ServletException;
+ import javax.servlet.http.HttpServlet;
+ import javax.servlet.http.HttpServletRequest;
+ import javax.servlet.http.HttpServletResponse;
+ import javax.ws.rs.Path;
+ import javax.ws.rs.core.MediaType;
+
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.stream.io.BufferedOutputStream;
+ import org.apache.nifi.stream.io.StreamThrottler;
+ import org.apache.nifi.logging.ProcessorLog;
++import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessSessionFactory;
+ import org.apache.nifi.processor.io.OutputStreamCallback;
+ import org.apache.nifi.processors.standard.ListenHTTP;
+ import org.apache.nifi.processors.standard.ListenHTTP.FlowFileEntryTimeWrapper;
+ import org.apache.nifi.util.FlowFileUnpackager;
+ import org.apache.nifi.util.FlowFileUnpackagerV1;
+ import org.apache.nifi.util.FlowFileUnpackagerV2;
+ import org.apache.nifi.util.FlowFileUnpackagerV3;
+ import org.apache.commons.io.IOUtils;
+ import org.apache.commons.lang3.StringUtils;
+
+ @Path(ListenHTTP.URI)
+ public class ListenHTTPServlet extends HttpServlet {
+
+ private static final long serialVersionUID = 5329940480987723163L;
+
+ public static final String FLOWFILE_CONFIRMATION_HEADER = "x-prefer-acknowledge-uri";
+ public static final String LOCATION_HEADER_NAME = "Location";
+ public static final String DEFAULT_FOUND_SUBJECT = "none";
+ public static final String APPLICATION_FLOW_FILE_V1 = "application/flowfile";
+ public static final String APPLICATION_FLOW_FILE_V2 = "application/flowfile-v2";
+ public static final String APPLICATION_FLOW_FILE_V3 = "application/flowfile-v3";
+ public static final String LOCATION_URI_INTENT_NAME = "x-location-uri-intent";
+ public static final String LOCATION_URI_INTENT_VALUE = "flowfile-hold";
+ public static final int FILES_BEFORE_CHECKING_DESTINATION_SPACE = 5;
+ public static final String ACCEPT_HEADER_NAME = "Accept";
+ public static final String ACCEPT_HEADER_VALUE = APPLICATION_FLOW_FILE_V3 + "," + APPLICATION_FLOW_FILE_V2 + "," + APPLICATION_FLOW_FILE_V1 + ",*/*;q=0.8";
+ public static final String ACCEPT_ENCODING_NAME = "Accept-Encoding";
+ public static final String ACCEPT_ENCODING_VALUE = "gzip";
+ public static final String GZIPPED_HEADER = "flowfile-gzipped";
+ public static final String PROTOCOL_VERSION_HEADER = "x-nifi-transfer-protocol-version";
+ public static final String PROTOCOL_VERSION = "3";
+
+ private final AtomicLong filesReceived = new AtomicLong(0L);
+ private final AtomicBoolean spaceAvailable = new AtomicBoolean(true);
+
+ private ProcessorLog logger;
+ private AtomicReference<ProcessSessionFactory> sessionFactoryHolder;
++ private volatile ProcessContext processContext;
+ private Pattern authorizedPattern;
+ private Pattern headerPattern;
+ private ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap;
+ private StreamThrottler streamThrottler;
+
+ /**
+ *
+ * @param config
+ * @throws ServletException
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(final ServletConfig config) throws ServletException {
+ final ServletContext context = config.getServletContext();
+ this.logger = (ProcessorLog) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_LOGGER);
+ this.sessionFactoryHolder = (AtomicReference<ProcessSessionFactory>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER);
++ this.processContext = (ProcessContext) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER);
+ this.authorizedPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN);
+ this.headerPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_HEADER_PATTERN);
+ this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP);
+ this.streamThrottler = (StreamThrottler) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER);
+ }
+
+ @Override
+ protected void doHead(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
+ response.addHeader(ACCEPT_ENCODING_NAME, ACCEPT_ENCODING_VALUE);
+ response.addHeader(ACCEPT_HEADER_NAME, ACCEPT_HEADER_VALUE);
+ response.addHeader(PROTOCOL_VERSION_HEADER, PROTOCOL_VERSION);
+ }
+
+ @Override
+ protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
++ final ProcessContext context = processContext;
++
+ ProcessSessionFactory sessionFactory;
+ do {
+ sessionFactory = sessionFactoryHolder.get();
+ if (sessionFactory == null) {
+ try {
+ Thread.sleep(10);
+ } catch (final InterruptedException e) {
+ }
+ }
+ } while (sessionFactory == null);
+
+ final ProcessSession session = sessionFactory.createSession();
+ FlowFile flowFile = null;
+ String holdUuid = null;
+ String foundSubject = null;
+ try {
+ final long n = filesReceived.getAndIncrement() % FILES_BEFORE_CHECKING_DESTINATION_SPACE;
+ if (n == 0 || !spaceAvailable.get()) {
- if (session.getAvailableRelationships().isEmpty()) {
++ if (context.getAvailableRelationships().isEmpty()) {
+ spaceAvailable.set(false);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received request from " + request.getRemoteHost() + " but no space available; Indicating Service Unavailable");
+ }
+ response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
+ return;
+ } else {
+ spaceAvailable.set(true);
+ }
+ }
+ response.setHeader("Content-Type", MediaType.TEXT_PLAIN);
+
+ final boolean contentGzipped = Boolean.parseBoolean(request.getHeader(GZIPPED_HEADER));
+
+ final X509Certificate[] certs = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate");
+ foundSubject = DEFAULT_FOUND_SUBJECT;
+ if (certs != null && certs.length > 0) {
+ for (final X509Certificate cert : certs) {
+ foundSubject = cert.getSubjectDN().getName();
+ if (authorizedPattern.matcher(foundSubject).matches()) {
+ break;
+ } else {
+ logger.warn("Rejecting transfer attempt from " + foundSubject + " because the DN is not authorized, host=" + request.getRemoteHost());
+ response.sendError(HttpServletResponse.SC_FORBIDDEN, "not allowed based on dn");
+ return;
+ }
+ }
+ }
+
+ final String destinationVersion = request.getHeader(PROTOCOL_VERSION_HEADER);
+ Integer protocolVersion = null;
+ if (destinationVersion != null) {
+ try {
+ protocolVersion = Integer.valueOf(destinationVersion);
+ } catch (final NumberFormatException e) {
+ // Value was invalid. Treat as if the header were missing.
+ }
+ }
+
+ final boolean destinationIsLegacyNiFi = (protocolVersion == null);
+ final boolean createHold = Boolean.parseBoolean(request.getHeader(FLOWFILE_CONFIRMATION_HEADER));
+ final String contentType = request.getContentType();
+
+ final InputStream unthrottled = contentGzipped ? new GZIPInputStream(request.getInputStream()) : request.getInputStream();
+
+ final InputStream in = (streamThrottler == null) ? unthrottled : streamThrottler.newThrottledInputStream(unthrottled);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received request from " + request.getRemoteHost() + ", createHold=" + createHold + ", content-type=" + contentType + ", gzip=" + contentGzipped);
+ }
+
+ final AtomicBoolean hasMoreData = new AtomicBoolean(false);
+ final FlowFileUnpackager unpackager;
+ if (APPLICATION_FLOW_FILE_V3.equals(contentType)) {
+ unpackager = new FlowFileUnpackagerV3();
+ } else if (APPLICATION_FLOW_FILE_V2.equals(contentType)) {
+ unpackager = new FlowFileUnpackagerV2();
+ } else if (APPLICATION_FLOW_FILE_V1.equals(contentType)) {
+ unpackager = new FlowFileUnpackagerV1();
+ } else {
+ unpackager = null;
+ }
+
+ final Set<FlowFile> flowFileSet = new HashSet<>();
+
+ do {
+ final long startNanos = System.nanoTime();
+ final Map<String, String> attributes = new HashMap<>();
+ flowFile = session.create();
+ flowFile = session.write(flowFile, new OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream rawOut) throws IOException {
+ try (final BufferedOutputStream bos = new BufferedOutputStream(rawOut, 65536)) {
+ if (unpackager == null) {
+ IOUtils.copy(in, bos);
+ hasMoreData.set(false);
+ } else {
+ attributes.putAll(unpackager.unpackageFlowFile(in, bos));
+
+ if (destinationIsLegacyNiFi) {
+ if (attributes.containsKey("nf.file.name")) {
+ // for backward compatibility with old nifi...
+ attributes.put(CoreAttributes.FILENAME.key(), attributes.remove("nf.file.name"));
+ }
+
+ if (attributes.containsKey("nf.file.path")) {
+ attributes.put(CoreAttributes.PATH.key(), attributes.remove("nf.file.path"));
+ }
+ }
+
+ // remove deprecated FlowFile attribute that was used in older versions of NiFi
+ attributes.remove("parent.uuid");
+
+ hasMoreData.set(unpackager.hasMoreData());
+ }
+ }
+ }
+ });
+
+ final long transferNanos = System.nanoTime() - startNanos;
+ final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
+
+ // put metadata on flowfile
+ final String nameVal = request.getHeader(CoreAttributes.FILENAME.key());
+ if (StringUtils.isNotBlank(nameVal)) {
+ attributes.put(CoreAttributes.FILENAME.key(), nameVal);
+ }
+
+ // put arbitrary headers on flow file
+ for(Enumeration<String> headerEnum = request.getHeaderNames();
+ headerEnum.hasMoreElements(); ) {
+ String headerName = headerEnum.nextElement();
+ if (headerPattern != null && headerPattern.matcher(headerName).matches()) {
+ String headerValue = request.getHeader(headerName);
+ attributes.put(headerName, headerValue);
+ }
+ }
+
+ String sourceSystemFlowFileIdentifier = attributes.get(CoreAttributes.UUID.key());
+ if (sourceSystemFlowFileIdentifier != null) {
+ sourceSystemFlowFileIdentifier = "urn:nifi:" + sourceSystemFlowFileIdentifier;
+
+ // If we receveied a UUID, we want to give the FlowFile a new UUID and register the sending system's
+ // identifier as the SourceSystemFlowFileIdentifier field in the Provenance RECEIVE event
+ attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
+ }
+
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ session.getProvenanceReporter().receive(flowFile, request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, "Remote DN=" + foundSubject, transferMillis);
+ flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject);
+ flowFileSet.add(flowFile);
+
+ if (holdUuid == null) {
+ holdUuid = flowFile.getAttribute(CoreAttributes.UUID.key());
+ }
+ } while (hasMoreData.get());
+
+ if (createHold) {
+ String uuid = (holdUuid == null) ? UUID.randomUUID().toString() : holdUuid;
+
+ if (flowFileMap.containsKey(uuid)) {
+ uuid = UUID.randomUUID().toString();
+ }
+
+ final FlowFileEntryTimeWrapper wrapper = new FlowFileEntryTimeWrapper(session, flowFileSet, System.currentTimeMillis());
+ FlowFileEntryTimeWrapper previousWrapper;
+ do {
+ previousWrapper = flowFileMap.putIfAbsent(uuid, wrapper);
+ if (previousWrapper != null) {
+ uuid = UUID.randomUUID().toString();
+ }
+ } while (previousWrapper != null);
+
+ response.setStatus(HttpServletResponse.SC_SEE_OTHER);
+ final String ackUri = ListenHTTP.URI + "/holds/" + uuid;
+ response.addHeader(LOCATION_HEADER_NAME, ackUri);
+ response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE);
+ response.getOutputStream().write(ackUri.getBytes("UTF-8"));
+ if (logger.isDebugEnabled()) {
+ logger.debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}]; placed hold on these {} files with ID {}",
+ new Object[]{flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFileSet.size(), uuid});
+ }
+ } else {
+ response.setStatus(HttpServletResponse.SC_OK);
+ logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}]; transferring to 'success' {}",
+ new Object[]{request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFile});
+
+ session.transfer(flowFileSet, ListenHTTP.RELATIONSHIP_SUCCESS);
+ session.commit();
+ }
+ } catch (final Throwable t) {
+ session.rollback();
+ if (flowFile == null) {
+ logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] due to {}", new Object[]{request.getRemoteHost(), foundSubject, t});
+ } else {
+ logger.error("Unable to receive file {} from Remote Host: [{}] SubjectDN [{}] due to {}", new Object[]{flowFile, request.getRemoteHost(), foundSubject, t});
+ }
+ response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t.toString());
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java
----------------------------------------------------------------------
diff --cc nifi/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java
index 0000000,a6402e4..ab4c978
mode 000000,100644..100644
--- a/nifi/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java
+++ b/nifi/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java
@@@ -1,0 -1,138 +1,139 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.processors.standard;
+
+ import org.apache.nifi.processors.standard.DistributeLoad;
+ import org.apache.nifi.util.TestRunner;
+ import org.apache.nifi.util.TestRunners;
+
+ import org.junit.Assert;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+
+ public class TestDistributeLoad {
+
+ @BeforeClass
+ public static void before() {
+ System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
+ System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
+ System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.DistributeLoad", "debug");
+ }
+
+ @Test
+ public void testDefaultRoundRobin() {
+ final TestRunner testRunner = TestRunners.newTestRunner(new DistributeLoad());
+ testRunner.setProperty(DistributeLoad.NUM_RELATIONSHIPS, "100");
+
+ for (int i = 0; i < 101; i++) {
+ testRunner.enqueue(new byte[0]);
+ }
+
+ testRunner.run(101);
+ testRunner.assertTransferCount("1", 2);
+ for (int i = 2; i <= 100; i++) {
+ testRunner.assertTransferCount(String.valueOf(i), 1);
+ }
+ }
+
+ @Test
+ public void testWeightedRoundRobin() {
+ final TestRunner testRunner = TestRunners.newTestRunner(new DistributeLoad());
+ testRunner.setProperty(DistributeLoad.NUM_RELATIONSHIPS, "100");
+
+ testRunner.setProperty("1", "5");
+ testRunner.setProperty("2", "3");
+
+ for (int i = 0; i < 106; i++) {
+ testRunner.enqueue(new byte[0]);
+ }
+
+ testRunner.run(108);
+ testRunner.assertTransferCount("1", 5);
+ testRunner.assertTransferCount("2", 3);
+ for (int i = 3; i <= 100; i++) {
+ testRunner.assertTransferCount(String.valueOf(i), 1);
+ }
+ }
+
+ @Test
+ public void testValidationOnAddedProperties() {
+ final TestRunner testRunner = TestRunners.newTestRunner(new DistributeLoad());
+ testRunner.setProperty(DistributeLoad.NUM_RELATIONSHIPS, "100");
+
+ testRunner.setProperty("1", "5");
+
+ try {
+ testRunner.setProperty("1", "0");
+ Assert.fail("Allows property '1' to be set to '0'");
+ } catch (final AssertionError e) {
+ // expected behavior
+ }
+
+ try {
+ testRunner.setProperty("1", "-1");
+ Assert.fail("Allows property '1' to be set to '-1'");
+ } catch (final AssertionError e) {
+ // expected behavior
+ }
+
+ testRunner.setProperty("1", "101");
+ testRunner.setProperty("100", "5");
+
+ try {
+ testRunner.setProperty("101", "5");
+ Assert.fail("Allows property '101' to be set to '5'");
+ } catch (final AssertionError e) {
+ // expected behavior
+ }
+
+ try {
+ testRunner.setProperty("0", "5");
+ Assert.fail("Allows property '0' to be set to '5'");
+ } catch (final AssertionError e) {
+ // expected behavior
+ }
+
+ try {
+ testRunner.setProperty("-1", "5");
+ Assert.fail("Allows property '-1' to be set to '5'");
+ } catch (final AssertionError e) {
+ // expected behavior
+ }
+ }
+
+ @Test
+ public void testNextAvailable() {
+ final TestRunner testRunner = TestRunners.newTestRunner(new DistributeLoad());
+
+ testRunner.setProperty(DistributeLoad.NUM_RELATIONSHIPS.getName(), "100");
+ testRunner.setProperty(DistributeLoad.DISTRIBUTION_STRATEGY.getName(), DistributeLoad.STRATEGY_NEXT_AVAILABLE);
+
+ for (int i = 0; i < 99; i++) {
+ testRunner.enqueue(new byte[0]);
+ }
+
+ testRunner.setRelationshipUnavailable("50");
+
+ testRunner.run(101);
+ testRunner.assertQueueEmpty();
+
+ for (int i = 1; i <= 100; i++) {
++ System.out.println(i);
+ testRunner.assertTransferCount(String.valueOf(i), (i == 50) ? 0 : 1);
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
----------------------------------------------------------------------
diff --cc nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
index 0000000,9e04439..7fa183f
mode 000000,100644..100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
@@@ -1,0 -1,124 +1,132 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.processor;
+
+ import java.util.Map;
++import java.util.Set;
+
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.PropertyValue;
+ import org.apache.nifi.controller.ControllerServiceLookup;
+
+ /**
+ * <p>
+ * Provides a bridge between a Processor and the NiFi Framework
+ * </p>
+ *
+ * <p>
+ * <b>Note: </b>Implementations of this interface are NOT necessarily
+ * thread-safe.
+ * </p>
+ */
+ public interface ProcessContext {
+
+ /**
+ * Retrieves the current value set for the given descriptor, if a value is
+ * set - else uses the descriptor to determine the appropriate default value
+ *
+ * @param descriptor
+ * @return
+ */
+ PropertyValue getProperty(PropertyDescriptor descriptor);
+
+ /**
+ * Retrieves the current value set for the given descriptor, if a value is
+ * set - else uses the descriptor to determine the appropriate default value
+ *
+ * @param propertyName
+ * @return
+ */
+ PropertyValue getProperty(String propertyName);
+
+ /**
+ * Creates and returns a {@link PropertyValue} object that can be used for
+ * evaluating the value of the given String
+ *
+ * @param rawValue
+ * @return
+ */
+ PropertyValue newPropertyValue(String rawValue);
+
+ /**
+ * <p>
+ * Causes the Processor not to be scheduled for some pre-configured amount
+ * of time. The duration of time for which the processor will not be
+ * scheduled is configured in the same manner as the processor's scheduling
+ * period.
+ * </p>
+ *
+ * <p>
+ * <b>Note: </b>This is NOT a blocking call and does not suspend execution
+ * of the current thread.
+ * </p>
+ */
+ void yield();
+
+ /**
+ * @return the maximum number of threads that may be executing this
+ * processor's code at any given time
+ */
+ int getMaxConcurrentTasks();
+
+ /**
+ * @return the annotation data configured for this processor
+ */
+ String getAnnotationData();
+
+ /**
+ * Returns a Map of all PropertyDescriptors to their configured values. This
+ * Map may or may not be modifiable, but modifying its values will not
+ * change the values of the processor's properties
+ *
+ * @return
+ */
+ Map<PropertyDescriptor, String> getProperties();
+
+ /**
+ * Encrypts the given value using the password provided in the NiFi
+ * Properties
+ *
+ * @param unencrypted
+ * @return
+ */
+ String encrypt(String unencrypted);
+
+ /**
+ * Decrypts the given value using the password provided in the NiFi
+ * Properties
+ *
+ * @param encrypted
+ * @return
+ */
+ String decrypt(String encrypted);
+
+ /**
+ * Provides a {@code ControllerServiceLookup} that can be used to obtain a
+ * Controller Service
+ *
+ * @return
+ */
+ ControllerServiceLookup getControllerServiceLookup();
++
++ /**
++ * @return the set of all relationships for which space is available to
++ * receive new objects
++ */
++ Set<Relationship> getAvailableRelationships();
++
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
----------------------------------------------------------------------
diff --cc nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
index 0000000,09d1bd2..d3de916
mode 000000,100644..100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
@@@ -1,0 -1,719 +1,713 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.processor;
+
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.nio.file.Path;
+ import java.util.Collection;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.regex.Pattern;
+
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.processor.exception.FlowFileAccessException;
+ import org.apache.nifi.processor.exception.FlowFileHandlingException;
+ import org.apache.nifi.processor.exception.MissingFlowFileException;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processor.io.OutputStreamCallback;
+ import org.apache.nifi.processor.io.StreamCallback;
+ import org.apache.nifi.provenance.ProvenanceReporter;
+
+ /**
+ * <p>
+ * A process session encompasses all the behaviors a processor can perform to
+ * obtain, clone, read, modify remove FlowFiles in an atomic unit. A process
+ * session is always tied to a single processor at any one time and ensures no
+ * FlowFile can ever be accessed by any more than one processor at a given time.
+ * The session also ensures that all FlowFiles are always accounted for. The
+ * creator of a ProcessSession is always required to manage the session.</p>
+ *
+ * <p>
+ * A session is not considered thread safe. The session supports a unit of work
+ * that is either committed or rolled back</p>
+ *
+ * <p>
+ * As noted on specific methods and for specific exceptions automated rollback
+ * will occur to ensure consistency of the repository. However, several
+ * situations can result in exceptions yet not cause automated rollback. In
+ * these cases the consistency of the repository will be retained but callers
+ * will be able to indicate whether it should result in rollback or continue on
+ * toward a commit.</p>
+ *
+ * <p>
+ * A process session instance may be used continuously. That is, after each
+ * commit or rollback, the session can be used again.</p>
+ *
+ * @author unattributed
+ */
+ public interface ProcessSession {
+
+ /**
+ * <p>
+ * Commits the current session ensuring all operations against FlowFiles
+ * within this session are atomically persisted. All FlowFiles operated on
+ * within this session must be accounted for by transfer or removal or the
+ * commit will fail.</p>
+ *
+ * <p>
+ * As soon as the commit completes the session is again ready to be used</p>
+ *
+ * @throws IllegalStateException if detected that this method is being
+ * called from within a callback of another method in this session.
+ * @throws FlowFileHandlingException if not all FlowFiles acted upon within
+ * this session are accounted for by user code such that they have a
+ * transfer identified or where marked for removal. Automated rollback
+ * occurs.
+ * @throws ProcessException if some general fault occurs while persisting
+ * the session. Initiates automatic rollback. The root cause can be obtained
+ * via <code>Exception.getCause()</code>
+ */
+ void commit();
+
+ /**
+ * Reverts any changes made during this session. All FlowFiles are restored
+ * back to their initial session state and back to their original queues. If
+ * this session is already committed or rolled back then no changes will
+ * occur. This method can be called any number of times. Calling this method
+ * is identical to calling {@link #rollback(boolean)} passing
+ * <code>false</code> as the parameter.
+ */
+ void rollback();
+
+ /**
+ * Reverts any changes made during this session. All FlowFiles are restored
+ * back to their initial session state and back to their original queues,
+ * after optionally being penalized. If this session is already committed or
+ * rolled back then no changes will occur. This method can be called any
+ * number of times.
+ *
+ * @param penalize whether or not the FlowFiles that are being restored back
+ * to their queues should be penalized
+ */
+ void rollback(boolean penalize);
+
+ /**
+ * Adjusts counter data for the given counter name and takes care of
+ * registering the counter if not already present. The adjustment occurs
+ * only if and when the ProcessSession is committed.
+ *
+ * @param name the name of the counter
+ * @param delta the delta by which to modify the counter (+ or -)
+ * @param immediate if true, the counter will be updated immediately,
+ * without regard to whether the ProcessSession is commit or rolled back;
+ * otherwise, the counter will be incremented only if and when the
+ * ProcessSession is committed.
+ */
+ void adjustCounter(String name, long delta, boolean immediate);
+
+ /**
+ * @return FlowFile that is next highest priority FlowFile to process.
+ * Otherwise returns null.
+ */
+ FlowFile get();
+
+ /**
+ * Returns up to <code>maxResults</code> FlowFiles from the work queue. If
+ * no FlowFiles are available, returns an empty list. Will not return null.
+ * If multiple incoming queues are present, the behavior is unspecified in
+ * terms of whether all queues or only a single queue will be polled in a
+ * single call.
+ *
+ * @param maxResults the maximum number of FlowFiles to return
+ * @return
+ * @throws IllegalArgumentException if <code>maxResults</code> is less than
+ * 0
+ */
+ List<FlowFile> get(int maxResults);
+
+ /**
+ * <p>
+ * Returns all FlowFiles from all of the incoming queues for which the given
+ * {@link FlowFileFilter} indicates should be accepted. Calls to this method
+ * provide exclusive access to the underlying queues. I.e., no other thread
+ * will be permitted to pull FlowFiles from this Processor's queues or add
+ * FlowFiles to this Processor's incoming queues until this method call has
+ * returned.
+ * </p>
+ *
+ * @param filter
+ * @return
+ */
+ List<FlowFile> get(FlowFileFilter filter);
+
+ /**
+ * @return the QueueSize that represents the number of FlowFiles and their
+ * combined data size for all FlowFiles waiting to be processed by the
+ * Processor that owns this ProcessSession, regardless of which Connection
+ * the FlowFiles live on
+ */
+ QueueSize getQueueSize();
+
+ /**
- * @return the set of all relationships for which space is available to
- * receive new objects
- */
- Set<Relationship> getAvailableRelationships();
-
- /**
+ * Creates a new FlowFile in the repository with no content and without any
+ * linkage to a parent FlowFile. This method is appropriate only when data
+ * is received or created from an external system. Otherwise, this method
+ * should be avoided and should instead use {@link #create(FlowFile)} or
+ * {@link #create(Collection<FlowFile>)}.
+ *
+ * When this method is used, a Provenance CREATE or RECEIVE Event should be
+ * generated. See the {@link #getProvenanceReporter()} method and
+ * {@link ProvenanceReporter} class for more information
+ *
+ * @return newly created FlowFile
+ */
+ FlowFile create();
+
+ /**
+ * Creates a new FlowFile in the repository with no content but with a
+ * parent linkage to <code>parent</code>. The newly created FlowFile will
+ * inherit all of the parent's attributes except for the UUID. This method
+ * will automatically generate a Provenance FORK event or a Provenance JOIN
+ * event, depending on whether or not other FlowFiles are generated from the
+ * same parent before the ProcessSession is committed.
+ *
+ * @param parent
+ * @return
+ */
+ FlowFile create(FlowFile parent);
+
+ /**
+ * Creates a new FlowFile in the repository with no content but with a
+ * parent linkage to the FlowFiles specified by the parents Collection. The
+ * newly created FlowFile will inherit all of the attributes that are in
+ * common to all parents (except for the UUID, which will be in common if
+ * only a single parent exists). This method will automatically generate a
+ * Provenance JOIN event.
+ *
+ * @param parents
+ * @return
+ */
+ FlowFile create(Collection<FlowFile> parents);
+
+ /**
+ * Creates a new FlowFile that is a clone of the given FlowFile as of the
+ * time this is called, both in content and attributes. This method
+ * automatically emits a Provenance CLONE Event.
+ *
+ * @param example FlowFile to be the source of cloning - given FlowFile must
+ * be a part of the given session
+ * @return FlowFile that is a clone of the given example
+ * @throws IllegalStateException if detected that this method is being
+ * called from within a callback of another method in this session and for
+ * the given FlowFile(s)
+ * @throws FlowFileHandlingException if the given FlowFile is already
+ * transferred or removed or doesn't belong to this session. Automatic
+ * rollback will occur.
+ * @throws MissingFlowFileException if the given FlowFile content cannot be
+ * found. The FlowFile should no longer be reference, will be internally
+ * destroyed, and the session is automatically rolled back and what is left
+ * of the FlowFile is destroyed.
+ * @throws FlowFileAccessException if some IO problem occurs accessing
+ * FlowFile content
+ * @throws NullPointerException if the argument null
+ */
+ FlowFile clone(FlowFile example);
+
+ /**
+ * Creates a new FlowFile whose parent is the given FlowFile. The content of
+ * the new FlowFile will be a subset of the byte sequence of the given
+ * FlowFile starting at the specified offset and with the length specified.
+ * The new FlowFile will contain all of the attributes of the original. This
+ * method automatically emits a Provenance FORK Event (or a Provenance CLONE
+ * Event, if the offset is 0 and the size is exactly equal to the size of
+ * the example FlowFile).
+ *
+ * @param example
+ * @param offset
+ * @param size
+ * @return a FlowFile with the specified size whose parent is first argument
+ * to this function
+ *
+ * @throws IllegalStateException if detected that this method is being
+ * called from within a callback of another method in this session and for
+ * the given FlowFile
+ * @throws FlowFileHandlingException if the given FlowFile is already
+ * transferred or removed or doesn't belong to this session, or if the
+ * specified offset + size exceeds that of the size of the example FlowFile.
+ * Automatic rollback will occur.
+ * @throws MissingFlowFileException if the given FlowFile content cannot be
+ * found. The FlowFile should no longer be reference, will be internally
+ * destroyed, and the session is automatically rolled back and what is left
+ * of the FlowFile is destroyed.
+ */
+ FlowFile clone(FlowFile example, long offset, long size);
+
+ /**
+ * Sets a penalty for the given FlowFile which will make it unavailable to
+ * be operated on any further during the penalty period.
+ *
+ * @param flowFile to penalize
+ * @return FlowFile the new FlowFile reference to use
+ * @throws IllegalStateException if detected that this method is being
+ * called from within a callback of another method in this session and for
+ * the given FlowFile(s)
+ * @throws FlowFileHandlingException if the given FlowFile is already
+ * transferred or removed or doesn't belong to this session. Automatic
+ * rollback will occur.
+ * @throws NullPointerException if the argument null
+ */
+ FlowFile penalize(FlowFile flowFile);
+
+ /**
+ * Updates the given FlowFiles attributes with the given key/value pair. If
+ * the key is named {@code uuid}, this attribute will be ignored.
+ *
+ * @param flowFile to update
+ * @param key of attribute
+ * @param value of attribute
+ * @return FlowFile the updated FlowFile
+ * @throws FlowFileHandlingException if the given FlowFile is already
+ * transferred or removed or doesn't belong to this session. Automatic
+ * rollback will occur.
+ * @throws NullPointerException if an argument is null
+ */
+ FlowFile putAttribute(FlowFile flowFile, String key, String value);
+
+ /**
+ * Updates the given FlowFiles attributes with the given key/value pairs. If
+ * the map contains a key named {@code uuid}, this attribute will be
+ * ignored.
+ *
+ * @param flowFile to update
+ * @param attributes the attributes to add to the given FlowFile
+ * @return FlowFile the updated FlowFile
+ * @throws IllegalStateException if detected that this method is being
+ * called from within a callback of another method in this session and for
+ * the given FlowFile(s)
+ * @throws FlowFileHandlingException if the given FlowFile is already
+ * transferred or removed or doesn't belong to this session. Automatic
+ * rollback will occur.
+ * @throws NullPointerException if an argument is null
+ */
+ FlowFile putAllAttributes(FlowFile flowFile, Map<String, String> attributes);
+
+ /**
+ * Removes the given FlowFile attribute with the given key. If the key is
+ * named {@code uuid}, this method will return the same FlowFile without
+ * removing any attribute.
+ *
+ * @param flowFile to update
+ * @param key of attribute
+ * @return FlowFile the updated FlowFile
+ * @throws IllegalStateException if detected that this method is being
+ * called from within a callback of another method in this session and for
+ * the given FlowFile(s)
+ * @throws FlowFileHandlingException if the given FlowFile is already
+ * transferred or removed or doesn't belong to this session. Automatic
+ * rollback will occur.
+ * @throws NullPointerException if the argument null
+ */
+ FlowFile removeAttribute(FlowFile flowFile, String key);
+
+ /**
+ * Removes the attributes with the given keys from the given FlowFile. If
+ * the set of keys contains the value {@code uuid}, this key will be ignored
+ *
+ * @param flowFile to update
+ * @param keys of attribute
+ * @return FlowFile the updated FlowFile
+ * @throws IllegalStateException if detected that this method is being
+ * called from within a callback of another method in this session and for
+ * the given FlowFile(s)
+ * @throws FlowFileHandlingException if the given FlowFile is already
+ * transferred or removed or doesn't belong to this session. Automatic
+ * rollback will occur.
+ * @throws NullPointerException if the argument null
+ */
+ FlowFile removeAllAttributes(FlowFile flowFile, Set<String> keys);
+
+ /**
+ * Remove all attributes from the given FlowFile that have keys which match
+ * the given pattern. If the pattern matches the key {@code uuid}, this key
+ * will not be removed.
+ *
+ * @param flowFile to update
+ * @param keyPattern may be null; if supplied is matched against each of the
+ * FlowFile attribute keys
+ * @return FlowFile containing only attributes which did not meet the key
+ * pattern
+ */
+ FlowFile removeAllAttributes(FlowFile flowFile, Pattern keyPattern);
+
+ /**
+ * Transfers the given FlowFile to the appropriate destination processor
+ * work queue(s) based on the given relationship. If the relationship leads
+ * to more than one destination the state of the FlowFile is replicated such
+ * that each destination receives an exact copy of the FlowFile though each
+ * will have its own unique identity. The destination processors will not be
+ * able to operate on the given FlowFile until this session is committed or
+ * until the ownership of the session is migrated to another processor. If
+ * ownership of the session is passed to a destination processor then that
+ * destination processor will have immediate visibility of the transferred
+ * FlowFiles within the session.
+ *
+ * @param flowFile
+ * @param relationship
+ * @throws IllegalStateException if detected that this method is being
+ * called from within a callback of another method in this session and for
+ * the given FlowFile(s)
+ * @throws FlowFileHandlingException if the given FlowFile is already
+ * transferred or removed or doesn't belong to this session. Automatic
+ * rollback will occur.
+ * @throws NullPointerException if the argument null
+ * @throws IllegalArgumentException if given relationship is not a known or
+ * registered relationship
+ */
+ void transfer(FlowFile flowFile, Relationship relationship);
+
+ /**
+ * Transfers the given FlowFile back to the work queue from which it was
+ * pulled. The processor will not be able to operate on the given FlowFile
+ * until this session is committed. Any modifications that have been made to
+ * the FlowFile will be maintained. FlowFiles that are created by the
+ * processor cannot be transferred back to themselves via this method.
+ *
+ * @param flowFile
+ * @throws IllegalStateException if detected that this method is being
+ * called from within a callback of another method in this session and for
+ * the given FlowFile(s)
+ * @throws FlowFileHandlingException if the given FlowFile is already
+ * transferred or removed or doesn't belong to this session. Automatic
+ * rollback will occur.
+ * @throws IllegalArgumentException if the FlowFile was created by this
+ * processor
+ * @throws NullPointerException if the argument null
+ */
+ void transfer(FlowFile flowFile);
+
+ /**
+ * Transfers the given FlowFiles back to the work queues from which the
+ * FlowFiles were pulled. The processor will not be able to operate on the
+ * given FlowFile until this session is committed. Any modifications that
+ * have been made to the FlowFile will be maintained. FlowFiles that are
+ * created by the processor cannot be transferred back to themselves via
+ * this method.
+ *
+ * @param flowFiles
+ * @throws IllegalStateException if detected that this method is being
+ * called from within a callback of another method in this session and for
+ * the given FlowFile(s)
+ * @throws FlowFileHandlingException if the given FlowFiles are already
+ * transferred or removed or don't belong to this session. Automatic
+ * rollback will occur.
+ * @throws IllegalArgumentException if the FlowFile was created by this
+ * processor
+ * @throws NullPointerException if the argument null
+ */
+ void transfer(Collection<FlowFile> flowFiles);
+
+ /**
+ * Transfers the given FlowFile to the appropriate destination processor
+ * work queue(s) based on the given relationship. If the relationship leads
+ * to more than one destination the state of the FlowFile is replicated such
+ * that each destination receives an exact copy of the FlowFile though each
+ * will have its own unique identity. The destination processors will not be
+ * able to operate on the given FlowFile until this session is committed or
+ * until the ownership of the session is migrated to another processor. If
+ * ownership of the session is passed to a destination processor then that
+ * destination processor will have immediate visibility of the transferred
+ * FlowFiles within the session.
+ *
+ * @param flowFiles
+ * @param relationship
+ * @throws IllegalStateException if detected that this method is being
+ * called from within a callback of another method in this session and for
+ * the given FlowFile(s)
+ * @throws FlowFileHandlingException if the given FlowFile is already
+ * transferred or removed or doesn't belong to this session. Automatic
+ * rollback will occur.
+ * @throws NullPointerException if the argument null
+ * @throws IllegalArgumentException if given relationship is not a known or
+ * registered relationship
+ */
+ void transfer(Collection<FlowFile> flowFiles, Relationship relationship);
+
+ /**
+ * Ends the managed persistence for the given FlowFile. The persistent
+ * attributes for the FlowFile are deleted and so is the content assuming
+ * nothing else references it and this FlowFile will no longer be available
+ * for further operation.
+ *
+ * @param flowFile
+ * @throws IllegalStateException if detected that this method is being
+ * called from within a callback of another method in this session and for
+ * the given FlowFile(s)
+ * @throws FlowFileHandlingException if the given FlowFile is already
+ * transferred or removed or doesn't belong to this session. Automatic
+ * rollback will occur.
+ */
+ void remove(FlowFile flowFile);
+
+ /**
+ * Ends the managed persistence for the given FlowFiles. The persistent
+ * attributes for the FlowFile are deleted and so is the content assuming
+ * nothing else references it and this FlowFile will no longer be available
+ * for further operation.
+ *
+ * @param flowFiles
+ * @throws IllegalStateException if detected that this method is being
+ * called from within a callback of another method in this session and for
+ * the given FlowFile(s)
+ * @throws FlowFileHandlingException if any of the given FlowFile is already
+ * transferred or removed or doesn't belong to this session. Automatic
+ * rollback will occur.
+ */
+ void remove(Collection<FlowFile> flowFiles);
+
+ /**
+ * Executes the given callback against the contents corresponding to the
+ * given FlowFile.
+ *
+ * @param source
+ * @param reader
+ * @throws IllegalStateException if detected that this method is being
+ * called from within a callback of another method in this session and for
+ * the given FlowFile(s)
+ * @throws FlowFileHandlingException if the given FlowFile is already
+ * transferred or removed or doesn't belong to this session. Automatic
+ * rollback will occur.
+ * @throws MissingFlowFileException if the given FlowFile content cannot be
+ * found. The FlowFile should no longer be reference, will be internally
+ * destroyed, and the session is automatically rolled back and what is left
+ * of the FlowFile is destroyed.
+ * @throws FlowFileAccessException if some IO problem occurs accessing
+ * FlowFile content
+ */
+ void read(FlowFile source, InputStreamCallback reader);
+
+ /**
+ * Combines the content of all given source FlowFiles into a single given
+ * destination FlowFile.
+ *
+ * @param sources
+ * @param destination
+ * @return updated destination FlowFile (new size, etc...)
+ * @throws IllegalStateException if detected that this method is being
+ * called from within a callback of another method in this session and for
+ * the given FlowFile(s)
+ * @throws IllegalArgumentException if the given destination is contained
+ * within the sources
+ * @throws FlowFileHandlingException if the given FlowFile is already
+ * transferred or removed or doesn't belong to this session. Automatic
+ * rollback will occur.
+ * @throws MissingFlowFileException if the given FlowFile content cannot be
+ * found. The FlowFile should no longer be reference, will be internally
+ * destroyed, and the session is automatically rolled back and what is left
+ * of the FlowFile is destroyed.
+ * @throws FlowFileAccessException if some IO problem occurs accessing
+ * FlowFile content. The state of the destination will be as it was prior to
+ * this call.
+ */
+ FlowFile merge(Collection<FlowFile> sources, FlowFile destination);
+
+ /**
+ * Combines the content of all given source FlowFiles into a single given
+ * destination FlowFile.
+ *
+ * @param sources
+ * @param destination
+ * @param header bytes that will be added to the beginning of the merged
+ * output. May be null or empty.
+ * @param footer bytes that will be added to the end of the merged output.
+ * May be null or empty.
+ * @param demarcator bytes that will be placed in between each object merged
+ * together. May be null or empty.
+ * @return updated destination FlowFile (new size, etc...)
+ * @throws IllegalStateException if detected that this method is being
+ * called from within a callback of another method in this session and for
+ * the given FlowFile(s)
+ * @throws IllegalArgumentException if the given destination is contained
+ * within the sources
+ * @throws FlowFileHandlingException if the given FlowFile is already
+ * transferred or removed or doesn't belong to this session. Automatic
+ * rollback will occur.
+ * @throws MissingFlowFileException if the given FlowFile content cannot be
+ * found. The FlowFile should no longer be reference, will be internally
+ * destroyed, and the session is automatically rolled back and what is left
+ * of the FlowFile is destroyed.
+ * @throws FlowFileAccessException if some IO problem occurs accessing
+ * FlowFile content. The state of the destination will be as it was prior to
+ * this call.
+ */
+ FlowFile merge(Collection<FlowFile> sources, FlowFile destination, byte[] header, byte[] footer, byte[] demarcator);
+
+ /**
+ * Executes the given callback against the content corresponding to the
+ * given FlowFile
+ *
+ * @param source
+ * @param writer
+ * @return updated FlowFile
+ * @throws IllegalStateException if detected that this method is being
+ * called from within a callback of another method in this session and for
+ * the given FlowFile(s)
+ * @throws FlowFileHandlingException if the given FlowFile is already
+ * transferred or removed or doesn't belong to this session. Automatic
+ * rollback will occur.
+ * @throws MissingFlowFileException if the given FlowFile content cannot be
+ * found. The FlowFile should no longer be referenced, will be internally
+ * destroyed, and the session is automatically rolled back and what is left
+ * of the FlowFile is destroyed.
+ * @throws FlowFileAccessException if some IO problem occurs accessing
+ * FlowFile content
+ */
+ FlowFile write(FlowFile source, OutputStreamCallback writer);
+
+ /**
+ * Executes the given callback against the content corresponding to the
+ * given flow file
+ *
+ * @param source
+ * @param writer
+ * @return updated FlowFile
+ * @throws IllegalStateException if detected that this method is being
+ * called from within a callback of another method in this session and for
+ * the given FlowFile(s)
+ * @throws FlowFileHandlingException if the given FlowFile is already
+ * transferred or removed or doesn't belong to this session. Automatic
+ * rollback will occur.
+ * @throws MissingFlowFileException if the given FlowFile content cannot be
+ * found. The FlowFile should no longer be reference, will be internally
+ * destroyed, and the session is automatically rolled back and what is left
+ * of the FlowFile is destroyed.
+ * @throws FlowFileAccessException if some IO problem occurs accessing
+ * FlowFile content
+ */
+ FlowFile write(FlowFile source, StreamCallback writer);
+
+ /**
+ * Executes the given callback against the content corresponding to the
+ * given FlowFile, such that any data written to the OutputStream of the
+ * content will be appended to the end of FlowFile.
+ *
+ * @param source
+ * @param writer
+ * @return
+ */
+ FlowFile append(FlowFile source, OutputStreamCallback writer);
+
+ /**
+ * Writes to the given FlowFile all content from the given content path.
+ *
+ * @param source the file from which content will be obtained
+ * @param keepSourceFile if true the content is simply copied; if false the
+ * original content might be used in a destructive way for efficiency such
+ * that the repository will have the data but the original data will be
+ * gone. If false the source object will be removed or gone once imported.
+ * It will not be restored if the session is rolled back so this must be
+ * used with caution. In some cases it can result in tremendous efficiency
+ * gains but is also dangerous.
+ * @param destination the FlowFile whose content will be updated
+ * @return the updated destination FlowFile (new size)
+ * @throws IllegalStateException if detected that this method is being
+ * called from within a callback of another method in this session and for
+ * the given FlowFile(s)
+ * @throws FlowFileHandlingException if the given FlowFile is already
+ * transferred or removed or doesn't belong to this session. Automatic
+ * rollback will occur.
+ * @throws MissingFlowFileException if the given FlowFile content cannot be
+ * found. The FlowFile should no longer be reference, will be internally
+ * destroyed, and the session is automatically rolled back and what is left
+ * of the FlowFile is destroyed.
+ * @throws FlowFileAccessException if some IO problem occurs accessing
+ * FlowFile content
+ */
+ FlowFile importFrom(Path source, boolean keepSourceFile, FlowFile destination);
+
+ /**
+ * Writes to the given FlowFile all content from the given content path.
+ *
+ * @param source the file from which content will be obtained
+ * @param destination the FlowFile whose content will be updated
+ * @return the updated destination FlowFile (new size)
+ * @throws IllegalStateException if detected that this method is being
+ * called from within a callback of another method in this session and for
+ * the given FlowFile(s)
+ * @throws FlowFileHandlingException if the given FlowFile is already
+ * transferred or removed or doesn't belong to this session. Automatic
+ * rollback will occur.
+ * @throws MissingFlowFileException if the given FlowFile content cannot be
+ * found. The FlowFile should no longer be reference, will be internally
+ * destroyed, and the session is automatically rolled back and what is left
+ * of the FlowFile is destroyed.
+ * @throws FlowFileAccessException if some IO problem occurs accessing
+ * FlowFile content
+ */
+ FlowFile importFrom(InputStream source, FlowFile destination);
+
+ /**
+ * Writes the content of the given FlowFile to the given destination path.
+ *
+ * @param flowFile
+ * @param destination
+ * @param append if true will append to the current content at the given
+ * path; if false will replace any current content
+ * @throws IllegalStateException if detected that this method is being
+ * called from within a callback of another method in this session and for
+ * the given FlowFile(s)
+ * @throws FlowFileHandlingException if the given FlowFile is already
+ * transferred or removed or doesn't belong to this session. Automatic
+ * rollback will occur.
+ * @throws MissingFlowFileException if the given FlowFile content cannot be
+ * found. The FlowFile should no longer be reference, will be internally
+ * destroyed, and the session is automatically rolled back and what is left
+ * of the FlowFile is destroyed.
+ * @throws FlowFileAccessException if some IO problem occurs accessing
+ * FlowFile content
+ */
+ void exportTo(FlowFile flowFile, Path destination, boolean append);
+
+ /**
+ * Writes the content of the given FlowFile to the given destination stream
+ *
+ * @param flowFile
+ * @param destination
+ * @throws IllegalStateException if detected that this method is being
+ * called from within a callback of another method in this session and for
+ * the given FlowFile(s)
+ * @throws FlowFileHandlingException if the given FlowFile is already
+ * transferred or removed or doesn't belong to this session. Automatic
+ * rollback will occur.
+ * @throws MissingFlowFileException if the given FlowFile content cannot be
+ * found. The FlowFile should no longer be reference, will be internally
+ * destroyed, and the session is automatically rolled back and what is left
+ * of the FlowFile is destroyed.
+ * @throws FlowFileAccessException if some IO problem occurs accessing
+ * FlowFile content
+ */
+ void exportTo(FlowFile flowFile, OutputStream destination);
+
+ /**
+ * Returns a ProvenanceReporter that is tied to this ProcessSession.
+ *
+ * @return
+ */
+ ProvenanceReporter getProvenanceReporter();
+ }