You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/01/20 15:01:52 UTC
[16/42] incubator-nifi git commit: Merge branch 'develop' of
https://git-wip-us.apache.org/repos/asf/incubator-nifi into develop
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --cc nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 0000000,3d3e854..dcb461c
mode 000000,100644..100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@@ -1,0 -1,2689 +1,2684 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.controller.repository;
+
+ import java.io.ByteArrayInputStream;
+ import java.io.EOFException;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.nio.file.Files;
+ import java.nio.file.Path;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.LinkedHashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.NoSuchElementException;
+ import java.util.Set;
+ import java.util.UUID;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicLong;
+ import java.util.regex.Pattern;
+
+ import org.apache.nifi.connectable.Connectable;
+ import org.apache.nifi.connectable.Connection;
+ import org.apache.nifi.controller.FlowFileQueue;
+ import org.apache.nifi.controller.ProcessorNode;
+ import org.apache.nifi.controller.repository.claim.ContentClaim;
+ import org.apache.nifi.controller.repository.io.ByteCountingInputStream;
+ import org.apache.nifi.controller.repository.io.ByteCountingOutputStream;
+ import org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream;
+ import org.apache.nifi.controller.repository.io.FlowFileAccessInputStream;
+ import org.apache.nifi.controller.repository.io.FlowFileAccessOutputStream;
+ import org.apache.nifi.controller.repository.io.LimitedInputStream;
+ import org.apache.nifi.controller.repository.io.LongHolder;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.stream.io.BufferedOutputStream;
+ import org.apache.nifi.stream.io.NonCloseableInputStream;
+ import org.apache.nifi.stream.io.StreamUtils;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.FlowFileFilter;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.QueueSize;
+ import org.apache.nifi.processor.Relationship;
+ import org.apache.nifi.processor.exception.FlowFileAccessException;
+ import org.apache.nifi.processor.exception.FlowFileHandlingException;
+ import org.apache.nifi.processor.exception.MissingFlowFileException;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processor.io.OutputStreamCallback;
+ import org.apache.nifi.processor.io.StreamCallback;
+ import org.apache.nifi.provenance.ProvenanceEventBuilder;
+ import org.apache.nifi.provenance.ProvenanceEventRecord;
+ import org.apache.nifi.provenance.ProvenanceEventRepository;
+ import org.apache.nifi.provenance.ProvenanceEventType;
+ import org.apache.nifi.provenance.ProvenanceReporter;
+ import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+ import org.apache.nifi.util.NiFiProperties;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ /**
+ * <p>
+ * Provides a ProcessSession that ensures all accesses, changes and transfers
+ * occur in an atomic manner for all FlowFiles including their contents and
+ * attributes</p>
+ * <p>
+ * NOT THREAD SAFE</p>
+ * <p/>
+ * @author none
+ */
+ public final class StandardProcessSession implements ProcessSession, ProvenanceEventEnricher {
+
+ private static final AtomicLong idGenerator = new AtomicLong(0L);
+
+ // determines how many things must be transferred, removed, modified in order to avoid logging the FlowFile ID's on commit/rollback
+ public static final int VERBOSE_LOG_THRESHOLD = 10;
+ private static final long MAX_APPENDABLE_CLAIM_SIZE = DataUnit.parseDataSize(
+ NiFiProperties.getInstance().getMaxAppendableClaimSize(), DataUnit.B).longValue();
+ private static final int MAX_FLOWFILES_PER_CLAIM = NiFiProperties.getInstance().getMaxFlowFilesPerClaim();
+
+ public static final String DEFAULT_FLOWFILE_PATH = "./";
+
+ private static final Logger LOG = LoggerFactory.getLogger(StandardProcessSession.class);
+ private static final Logger claimLog = LoggerFactory.getLogger(StandardProcessSession.class.getSimpleName() + ".claims");
+
+ private final Map<FlowFileRecord, StandardRepositoryRecord> records = new HashMap<>();
+ private final Map<Connection, StandardFlowFileEvent> connectionCounts = new HashMap<>();
+ private final Map<Connection, Set<FlowFileRecord>> unacknowledgedFlowFiles = new HashMap<>();
+ private final Map<String, Long> localCounters = new HashMap<>();
+ private final Map<String, Long> globalCounters = new HashMap<>();
+ private final Map<ContentClaim, ByteCountingOutputStream> appendableStreams = new HashMap<>();
+ private final ProcessContext context;
+ private final Set<FlowFile> recursionSet = new HashSet<>();//set used to track what is currently being operated on to prevent logic failures if recursive calls occurring
+ private final Set<Path> deleteOnCommit = new HashSet<>();
+ private final long sessionId;
+ private final String connectableDescription;
+
+ private final Set<String> removedFlowFiles = new HashSet<>();
+ private final Set<String> createdFlowFiles = new HashSet<>();
+
+ private final StandardProvenanceReporter provenanceReporter;
+
+ private int removedCount = 0; // number of flowfiles removed in this session
+ private long removedBytes = 0L; // size of all flowfiles removed in this session
+ private LongHolder bytesRead = new LongHolder(0L);
+ private LongHolder bytesWritten = new LongHolder(0L);
+ private int flowFilesIn = 0, flowFilesOut = 0;
+ private long contentSizeIn = 0L, contentSizeOut = 0L;
+ private int writeRecursionLevel = 0;
+
+ private ContentClaim currentWriteClaim = null;
+ private OutputStream currentWriteClaimStream = null;
+ private long currentWriteClaimSize = 0L;
+ private int currentWriteClaimFlowFileCount = 0;
+
+ private ContentClaim currentReadClaim = null;
+ private ByteCountingInputStream currentReadClaimStream = null;
+ private long processingStartTime;
+
+ // maps a FlowFile to all Provenance Events that were generated for that FlowFile.
+ // we do this so that if we generate a Fork event, for example, and then remove the event in the same
+ // Session, we will not send that event to the Provenance Repository
+ private Map<FlowFile, List<ProvenanceEventRecord>> generatedProvenanceEvents = new HashMap<>();
+
+ // when Forks are generated for a single parent, we add the Fork event to this map, with the Key being the parent
+ // so that we are able to aggregate many into a single Fork Event.
+ private Map<FlowFile, ProvenanceEventBuilder> forkEventBuilders = new HashMap<>();
+
+ private Checkpoint checkpoint = new Checkpoint();
+
+ public StandardProcessSession(final ProcessContext context) {
+ this.context = context;
+
+ final Connectable connectable = context.getConnectable();
+ final String componentType;
+
+ String description = connectable.toString();
+ switch (connectable.getConnectableType()) {
+ case PROCESSOR:
+ final ProcessorNode procNode = (ProcessorNode) connectable;
+ componentType = procNode.getProcessor().getClass().getSimpleName();
+ description = procNode.getProcessor().toString();
+ break;
+ case INPUT_PORT:
+ componentType = "Input Port";
+ break;
+ case OUTPUT_PORT:
+ componentType = "Output Port";
+ break;
+ case REMOTE_INPUT_PORT:
+ componentType = "Remote Input Port";
+ break;
+ case REMOTE_OUTPUT_PORT:
+ componentType = "Remote Output Port";
+ break;
+ case FUNNEL:
+ componentType = "Funnel";
+ break;
+ default:
+ throw new AssertionError("Connectable type is " + connectable.getConnectableType());
+ }
+
+ this.provenanceReporter = new StandardProvenanceReporter(connectable.getIdentifier(), componentType, context.getProvenanceRepository(), this);
+ this.sessionId = idGenerator.getAndIncrement();
+ this.connectableDescription = description;
+
+ LOG.trace("Session {} created for {}", this, connectableDescription);
+ processingStartTime = System.nanoTime();
+ }
+
+ public void checkpoint() {
+ if (!recursionSet.isEmpty()) {
+ throw new IllegalStateException();
+ }
+
+ if (this.checkpoint == null) {
+ this.checkpoint = new Checkpoint();
+ }
+
+ if (records.isEmpty()) {
+ LOG.trace("{} checkpointed, but no events were performed by this ProcessSession", this);
+ return;
+ }
+
+ // any drop event that is the result of an auto-terminate should happen at the very end, so we keep the
+ // records in a separate List so that they can be persisted to the Provenance Repo after all of the
+ // Processor-reported events.
+ List<ProvenanceEventRecord> autoTerminatedEvents = null;
+
+ //validate that all records have a transfer relationship for them and if so determine the destination node and clone as necessary
+ final Map<FlowFileRecord, StandardRepositoryRecord> toAdd = new HashMap<>();
+ for (final StandardRepositoryRecord record : records.values()) {
+ if (record.isMarkedForDelete()) {
+ continue;
+ }
+ final Relationship relationship = record.getTransferRelationship();
+ if (relationship == null) {
+ rollback();
+ throw new FlowFileHandlingException(record.getCurrent() + " transfer relationship not specified");
+ }
+ final List<Connection> destinations = new ArrayList<>(context.getConnections(relationship));
+ if (destinations.isEmpty() && !context.getConnectable().isAutoTerminated(relationship)) {
+ if (relationship != Relationship.SELF) {
+ rollback();
+ throw new FlowFileHandlingException(relationship + " does not have any destinations for " + context.getConnectable());
+ }
+ }
+
+ if (destinations.isEmpty() && relationship == Relationship.SELF) {
+ record.setDestination(record.getOriginalQueue());
+ } else if (destinations.isEmpty()) {
+ record.markForDelete();
+
+ if (autoTerminatedEvents == null) {
+ autoTerminatedEvents = new ArrayList<>();
+ }
+
+ final ProvenanceEventRecord dropEvent;
+ try {
+ dropEvent = provenanceReporter.generateDropEvent(record.getCurrent(), "Auto-Terminated by " + relationship.getName() + " Relationship");
+ autoTerminatedEvents.add(dropEvent);
+ } catch (final Exception e) {
+ LOG.warn("Unable to generate Provenance Event for {} on behalf of {} due to {}", record.getCurrent(), connectableDescription, e);
+ if (LOG.isDebugEnabled()) {
+ LOG.warn("", e);
+ }
+ }
+ } else {
+ final Connection finalDestination = destinations.remove(destinations.size() - 1); //remove last element
+ record.setDestination(finalDestination.getFlowFileQueue());
+ incrementConnectionInputCounts(finalDestination, record);
+
+ for (final Connection destination : destinations) { //iterate over remaining destinations and "clone" as needed
+ incrementConnectionInputCounts(destination, record);
+ final FlowFileRecord currRec = record.getCurrent();
+ final StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec);
+ builder.id(context.getNextFlowFileSequence());
+
+ final String newUuid = UUID.randomUUID().toString();
+ builder.addAttribute(CoreAttributes.UUID.key(), newUuid);
+
+ final FlowFileRecord clone = builder.build();
+ final StandardRepositoryRecord newRecord = new StandardRepositoryRecord(destination.getFlowFileQueue());
+ getProvenanceReporter().clone(currRec, clone);
+
+ final ContentClaim claim = clone.getContentClaim();
+ if (claim != null) {
+ context.getContentRepository().incrementClaimaintCount(claim);
+ }
+ newRecord.setWorking(clone, CoreAttributes.UUID.key(), newUuid);
+
+ newRecord.setDestination(destination.getFlowFileQueue());
+ newRecord.setTransferRelationship(record.getTransferRelationship());
+ // put the mapping into toAdd because adding to records now will cause a ConcurrentModificationException
+ toAdd.put(clone, newRecord);
+ }
+ }
+ }
+
+ records.putAll(toAdd);
+ toAdd.clear();
+
+ checkpoint.checkpoint(this, autoTerminatedEvents);
+ resetState();
+ }
+
+ @Override
+ public void commit() {
+ checkpoint();
+ commit(this.checkpoint);
+ this.checkpoint = null;
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private void commit(final Checkpoint checkpoint) {
+ final long commitStartNanos = System.nanoTime();
+
+ resetWriteClaims();
+ resetReadClaim();
+
+ final long updateProvenanceStart = System.nanoTime();
+ updateProvenanceRepo(checkpoint);
+
+ final long claimRemovalStart = System.nanoTime();
+ final long updateProvenanceNanos = claimRemovalStart - updateProvenanceStart;
+
+ // Figure out which content claims can be released.
+ // At this point, we will decrement the Claimant Count for the claims via the Content Repository.
+ // We do not actually destroy the content because otherwise, we could remove the
+ // Original Claim and crash/restart before the FlowFileRepository is updated. This will result in the FlowFile being restored such that
+ // the content claim points to the Original Claim -- which has already been removed!
+ for (final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : checkpoint.records.entrySet()) {
+ final FlowFile flowFile = entry.getKey();
+ final StandardRepositoryRecord record = entry.getValue();
+
+ if (record.isMarkedForDelete()) {
+ // if the working claim is not the same as the original claim, we can immediately destroy the working claim
+ // because it was created in this session and is to be deleted. We don't need to wait for the FlowFile Repo to sync.
+ removeContent(record.getWorkingClaim());
+
+ if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getWorkingClaim())) {
+ // if working & original claim are same, don't remove twice; we only want to remove the original
+ // if it's different from the working. Otherwise, we remove two claimant counts. This causes
+ // an issue if we only updated the FlowFile attributes.
+ removeContent(record.getOriginalClaim());
+ }
+ final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
+ final Connectable connectable = context.getConnectable();
+ final Object terminator = (connectable instanceof ProcessorNode) ? ((ProcessorNode) connectable).getProcessor() : connectable;
+ LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife});
+ } else if (record.isWorking() && record.getWorkingClaim() != record.getOriginalClaim()) {
+ //records which have been updated - remove original if exists
+ removeContent(record.getOriginalClaim());
+ }
+ }
+
+ final long claimRemovalFinishNanos = System.nanoTime();
+ final long claimRemovalNanos = claimRemovalFinishNanos - claimRemovalStart;
+
+ // Update the FlowFile Repository
+ try {
+ final Collection<StandardRepositoryRecord> repoRecords = checkpoint.records.values();
+ context.getFlowFileRepository().updateRepository((Collection) repoRecords);
+ } catch (final IOException ioe) {
+ rollback();
+ throw new ProcessException("FlowFile Repository failed to update", ioe);
+ }
+ final long flowFileRepoUpdateFinishNanos = System.nanoTime();
+ final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos - claimRemovalFinishNanos;
+
+ updateEventRepository(checkpoint);
+
+ final long updateEventRepositoryFinishNanos = System.nanoTime();
+ final long updateEventRepositoryNanos = updateEventRepositoryFinishNanos - claimRemovalFinishNanos;
+
+ // transfer the flowfiles to the connections' queues.
+ final Map<FlowFileQueue, Collection<FlowFileRecord>> recordMap = new HashMap<>();
+ for (final StandardRepositoryRecord record : checkpoint.records.values()) {
+ if (record.isMarkedForAbort() || record.isMarkedForDelete()) {
+ continue; //these don't need to be transferred
+ }
+ // record.getCurrent() will return null if this record was created in this session --
+ // in this case, we just ignore it, and it will be cleaned up by clearing the records map.
+ if (record.getCurrent() != null) {
+ Collection<FlowFileRecord> collection = recordMap.get(record.getDestination());
+ if (collection == null) {
+ collection = new ArrayList<>();
+ recordMap.put(record.getDestination(), collection);
+ }
+ collection.add(record.getCurrent());
+ }
+ }
+
+ for (final Map.Entry<FlowFileQueue, Collection<FlowFileRecord>> entry : recordMap.entrySet()) {
+ entry.getKey().putAll(entry.getValue());
+ }
+
+ final long enqueueFlowFileFinishNanos = System.nanoTime();
+ final long enqueueFlowFileNanos = enqueueFlowFileFinishNanos - updateEventRepositoryFinishNanos;
+
+ // Delete any files from disk that need to be removed.
+ for (final Path path : checkpoint.deleteOnCommit) {
+ try {
+ Files.deleteIfExists(path);
+ } catch (final IOException e) {
+ throw new FlowFileAccessException("Unable to delete " + path.toFile().getAbsolutePath(), e);
+ }
+ }
+ checkpoint.deleteOnCommit.clear();
+
+ if (LOG.isInfoEnabled()) {
+ final String sessionSummary = summarizeEvents(checkpoint);
+ if (!sessionSummary.isEmpty()) {
+ LOG.info("{} for {}, committed the following events: {}", new Object[]{this, connectableDescription, sessionSummary});
+ }
+ }
+
+ for (final Map.Entry<String, Long> entry : checkpoint.localCounters.entrySet()) {
+ adjustCounter(entry.getKey(), entry.getValue(), true);
+ }
+
+ for (final Map.Entry<String, Long> entry : checkpoint.globalCounters.entrySet()) {
+ adjustCounter(entry.getKey(), entry.getValue(), true);
+ }
+
+ acknowledgeRecords();
+ resetState();
+
+ if (LOG.isDebugEnabled()) {
+ final StringBuilder timingInfo = new StringBuilder();
+ timingInfo.append("Session commit for ").append(this).append(" [").append(connectableDescription).append("]").append(" took ");
+
+ final long commitNanos = System.nanoTime() - commitStartNanos;
+ formatNanos(commitNanos, timingInfo);
+ timingInfo.append("; FlowFile Repository Update took ");
+ formatNanos(flowFileRepoUpdateNanos, timingInfo);
+ timingInfo.append("; Claim Removal took ");
+ formatNanos(claimRemovalNanos, timingInfo);
+ timingInfo.append("; FlowFile Event Update took ");
+ formatNanos(updateEventRepositoryNanos, timingInfo);
+ timingInfo.append("; Enqueuing FlowFiles took ");
+ formatNanos(enqueueFlowFileNanos, timingInfo);
+ timingInfo.append("; Updating Provenance Event Repository took ");
+ formatNanos(updateProvenanceNanos, timingInfo);
+
+ LOG.debug(timingInfo.toString());
+ }
+ }
+
+ private void updateEventRepository(final Checkpoint checkpoint) {
+ int flowFilesReceived = 0;
+ int flowFilesSent = 0;
+ long bytesReceived = 0L;
+ long bytesSent = 0L;
+
+ for (final ProvenanceEventRecord event : checkpoint.reportedEvents) {
+ if (isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) {
+ continue;
+ }
+
+ switch (event.getEventType()) {
+ case SEND:
+ flowFilesSent++;
+ bytesSent += event.getFileSize();
+ break;
+ case RECEIVE:
+ flowFilesReceived++;
+ bytesReceived += event.getFileSize();
+ break;
+ default:
+ break;
+ }
+ }
+
+ try {
+ // update event repository
+ final Connectable connectable = context.getConnectable();
+ final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent(connectable.getIdentifier());
+ flowFileEvent.setBytesRead(checkpoint.bytesRead);
+ flowFileEvent.setBytesWritten(checkpoint.bytesWritten);
+ flowFileEvent.setContentSizeIn(checkpoint.contentSizeIn);
+ flowFileEvent.setContentSizeOut(checkpoint.contentSizeOut);
+ flowFileEvent.setContentSizeRemoved(checkpoint.removedBytes);
+ flowFileEvent.setFlowFilesIn(checkpoint.flowFilesIn);
+ flowFileEvent.setFlowFilesOut(checkpoint.flowFilesOut);
+ flowFileEvent.setFlowFilesRemoved(checkpoint.removedCount);
+ flowFileEvent.setFlowFilesReceived(flowFilesReceived);
+ flowFileEvent.setBytesReceived(bytesReceived);
+ flowFileEvent.setFlowFilesSent(flowFilesSent);
+ flowFileEvent.setBytesSent(bytesSent);
+
+ long lineageMillis = 0L;
+ for (final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : checkpoint.records.entrySet()) {
+ final FlowFile flowFile = entry.getKey();
+ final long lineageDuration = System.currentTimeMillis() - flowFile.getLineageStartDate();
+ lineageMillis += lineageDuration;
+ }
+ flowFileEvent.setAggregateLineageMillis(lineageMillis);
+
+ context.getFlowFileEventRepository().updateRepository(flowFileEvent);
+
+ for (final FlowFileEvent connectionEvent : checkpoint.connectionCounts.values()) {
+ context.getFlowFileEventRepository().updateRepository(connectionEvent);
+ }
+ } catch (final IOException ioe) {
+ LOG.error("FlowFile Event Repository failed to update", ioe);
+ }
+ }
+
+ private void addEventType(final Map<String, Set<ProvenanceEventType>> map, final String id, final ProvenanceEventType eventType) {
+ Set<ProvenanceEventType> eventTypes = map.get(id);
+ if ( eventTypes == null ) {
+ eventTypes = new HashSet<>();
+ map.put(id, eventTypes);
+ }
+
+ eventTypes.add(eventType);
+ }
+
+ private void updateProvenanceRepo(final Checkpoint checkpoint) {
+ // Update Provenance Repository
+ final ProvenanceEventRepository provenanceRepo = context.getProvenanceRepository();
+
+ // We need to de-dupe the events that we've created and those reported to the provenance reporter,
+ // in case the Processor developer submitted the same events to the reporter. So we use a LinkedHashSet
+ // for this, so that we are able to ensure that the events are submitted in the proper order.
+ final Set<ProvenanceEventRecord> recordsToSubmit = new LinkedHashSet<>();
+ final Map<String, Set<ProvenanceEventType>> eventTypesPerFlowFileId = new HashMap<>();
+
+ final Set<ProvenanceEventRecord> processorGenerated = checkpoint.reportedEvents;
+
+ // We first want to submit FORK events because if the Processor is going to create events against
+ // a FlowFile, that FlowFile needs to be shown to be created first.
+ // However, if the Processor has generated a FORK event, we don't want to use the Framework-created one --
+ // we prefer to use the event generated by the Processor. We can determine this by checking if the Set of events genereated
+ // by the Processor contains any of the FORK events that we generated
+ for (final Map.Entry<FlowFile, ProvenanceEventBuilder> entry : checkpoint.forkEventBuilders.entrySet()) {
+ final ProvenanceEventBuilder builder = entry.getValue();
+ final FlowFile flowFile = entry.getKey();
+
+ updateEventContentClaims(builder, flowFile, checkpoint.records.get(flowFile));
+ final ProvenanceEventRecord event = builder.build();
+
+ if (!event.getChildUuids().isEmpty() && !isSpuriousForkEvent(event, checkpoint.removedFlowFiles) && !processorGenerated.contains(event)) {
+ recordsToSubmit.add(event);
+
+ for ( final String childUuid : event.getChildUuids() ) {
+ addEventType(eventTypesPerFlowFileId, childUuid, event.getEventType());
+ }
+ for ( final String parentUuid : event.getParentUuids() ) {
+ addEventType(eventTypesPerFlowFileId, parentUuid, event.getEventType());
+ }
+ }
+ }
+
+ // Now add any Processor-reported events.
+ for (final ProvenanceEventRecord event : processorGenerated) {
+ if (isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) {
+ continue;
+ }
+ if ( isSpuriousRouteEvent(event, checkpoint.records) ) {
+ continue;
+ }
+
+ // Check if the event indicates that the FlowFile was routed to the same
+ // connection from which it was pulled (and only this connection). If so, discard the event.
+ isSpuriousRouteEvent(event, checkpoint.records);
+
+ recordsToSubmit.add(event);
+ addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType());
+ }
+
+ // Finally, add any other events that we may have generated.
+ for (final List<ProvenanceEventRecord> eventList : checkpoint.generatedProvenanceEvents.values()) {
+ for (final ProvenanceEventRecord event : eventList) {
+ if (isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) {
+ continue;
+ }
+
+ recordsToSubmit.add(event);
+ addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType());
+ }
+ }
+
+ // Check if content or attributes changed. If so, register the appropriate events.
+ for (final StandardRepositoryRecord repoRecord : checkpoint.records.values() ) {
+ final ContentClaim original = repoRecord.getOriginalClaim();
+ final ContentClaim current = repoRecord.getCurrentClaim();
+
+ boolean contentChanged = false;
+ if ( original == null && current != null ) {
+ contentChanged = true;
+ }
+ if ( original != null && current == null ) {
+ contentChanged = true;
+ }
+ if ( original != null && current != null && !original.equals(current) ) {
+ contentChanged = true;
+ }
+
+ final FlowFileRecord curFlowFile = repoRecord.getCurrent();
+ final String flowFileId = curFlowFile.getAttribute(CoreAttributes.UUID.key());
+ boolean eventAdded = false;
+
+ if (checkpoint.removedFlowFiles.contains(flowFileId)) {
+ continue;
+ }
+
+ final boolean newFlowFile = repoRecord.getOriginal() == null;
+ if ( contentChanged && !newFlowFile ) {
+ recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.CONTENT_MODIFIED).build());
+ addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.CONTENT_MODIFIED);
+ eventAdded = true;
+ }
+
+ if ( checkpoint.createdFlowFiles.contains(flowFileId) ) {
+ final Set<ProvenanceEventType> registeredTypes = eventTypesPerFlowFileId.get(flowFileId);
+ boolean creationEventRegistered = false;
+ if ( registeredTypes != null ) {
+ if ( registeredTypes.contains(ProvenanceEventType.CREATE) ||
+ registeredTypes.contains(ProvenanceEventType.FORK) ||
+ registeredTypes.contains(ProvenanceEventType.JOIN) ||
+ registeredTypes.contains(ProvenanceEventType.RECEIVE) ) {
+ creationEventRegistered = true;
+ }
+ }
+
+ if ( !creationEventRegistered ) {
+ recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.CREATE).build());
+ eventAdded = true;
+ }
+ }
+
+ if ( !eventAdded && !repoRecord.getUpdatedAttributes().isEmpty() ) {
+ // We generate an ATTRIBUTES_MODIFIED event only if no other event has been
+ // created for the FlowFile. We do this because all events contain both the
+ // newest and the original attributes, so generating an ATTRIBUTES_MODIFIED
+ // event is redundant if another already exists.
+ if ( !eventTypesPerFlowFileId.containsKey(flowFileId) ) {
+ recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.ATTRIBUTES_MODIFIED).build());
+ addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.ATTRIBUTES_MODIFIED);
+ }
+ }
+ }
+
+ // We want to submit the 'recordsToSubmit' collection, followed by the auto-terminated events to the Provenance Repository.
+ // We want to do this with a single call to ProvenanceEventRepository#registerEvents because it may be much more efficient
+ // to do so.
+ // However, we want to modify the events in 'recordsToSubmit' to obtain the data from the most recent version of the FlowFiles
+ // (except for SEND events); see note below as to why this is
+ // Therefore, we create an Iterable that can iterate over each of these events, modifying them as needed, and returning them
+ // in the appropriate order. This prevents an unnecessary step of creating an intermediate List and adding all of those values
+ // to the List.
+ // This is done in a similar veign to how Java 8's streams work, iterating over the events and returning a processed version
+ // one-at-a-time as opposed to iterating over the entire Collection and putting the results in another Collection. However,
+ // we don't want to change the Framework to require Java 8 at this time, because it's not yet as prevalent as we would desire
+ final Map<String, FlowFileRecord> flowFileRecordMap = new HashMap<>();
+ for (final StandardRepositoryRecord repoRecord : checkpoint.records.values()) {
+ final FlowFileRecord flowFile = repoRecord.getCurrent();
+ flowFileRecordMap.put(flowFile.getAttribute(CoreAttributes.UUID.key()), flowFile);
+ }
+
+ final List<ProvenanceEventRecord> autoTermEvents = checkpoint.autoTerminatedEvents;
+ final Iterable<ProvenanceEventRecord> iterable = new Iterable<ProvenanceEventRecord>() {
+ final Iterator<ProvenanceEventRecord> recordsToSubmitIterator = recordsToSubmit.iterator();
+ final Iterator<ProvenanceEventRecord> autoTermIterator = autoTermEvents == null ? null : autoTermEvents.iterator();
+
+ @Override
+ public Iterator<ProvenanceEventRecord> iterator() {
+ return new Iterator<ProvenanceEventRecord>() {
+ @Override
+ public boolean hasNext() {
+ return recordsToSubmitIterator.hasNext() || (autoTermIterator != null && autoTermIterator.hasNext());
+ }
+
+ @Override
+ public ProvenanceEventRecord next() {
+ if (recordsToSubmitIterator.hasNext()) {
+ final ProvenanceEventRecord rawEvent = recordsToSubmitIterator.next();
+
+ // Update the Provenance Event Record with all of the info that we know about the event.
+ // For SEND events, we do not want to update the FlowFile info on the Event, because the event should
+ // reflect the FlowFile as it was sent to the remote system. However, for other events, we want to use
+ // the representation of the FlowFile as it is committed, as this is the only way in which it really
+ // exists in our system -- all other representations are volatile representations that have not been
+ // exposed.
+ return enrich(rawEvent, flowFileRecordMap, checkpoint.records, rawEvent.getEventType() != ProvenanceEventType.SEND);
+ } else if (autoTermIterator != null && autoTermIterator.hasNext()) {
+ return enrich(autoTermIterator.next(), flowFileRecordMap, checkpoint.records, true);
+ }
+
+ throw new NoSuchElementException();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+
+ provenanceRepo.registerEvents(iterable);
+ }
+
+ private void updateEventContentClaims(final ProvenanceEventBuilder builder, final FlowFile flowFile, final StandardRepositoryRecord repoRecord) {
+ final ContentClaim originalClaim = repoRecord.getOriginalClaim();
+ if (originalClaim == null) {
+ builder.setCurrentContentClaim(null, null, null, null, 0L);
+ } else {
+ builder.setCurrentContentClaim(originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize());
+ builder.setPreviousContentClaim(originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize());
+ }
+ }
+
+ @Override
+ public StandardProvenanceEventRecord enrich(final ProvenanceEventRecord rawEvent, final FlowFile flowFile) {
+ final StandardRepositoryRecord repoRecord = records.get(flowFile);
+ if (repoRecord == null) {
+ throw new FlowFileHandlingException(flowFile + " is not known in this session (" + toString() + ")");
+ }
+
+ final StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder().fromEvent(rawEvent);
+ if (repoRecord.getCurrent() != null && repoRecord.getCurrentClaim() != null) {
+ final ContentClaim currentClaim = repoRecord.getCurrentClaim();
+ final long currentOffset = repoRecord.getCurrentClaimOffset();
+ final long size = flowFile.getSize();
+ recordBuilder.setCurrentContentClaim(currentClaim.getContainer(), currentClaim.getSection(), currentClaim.getId(), currentOffset, size);
+ }
+
+ if (repoRecord.getOriginal() != null && repoRecord.getOriginalClaim() != null) {
+ final ContentClaim originalClaim = repoRecord.getOriginalClaim();
+ final long originalOffset = repoRecord.getOriginal().getContentClaimOffset();
+ final long originalSize = repoRecord.getOriginal().getSize();
+ recordBuilder.setPreviousContentClaim(originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), originalOffset, originalSize);
+ }
+
+ final FlowFileQueue originalQueue = repoRecord.getOriginalQueue();
+ if (originalQueue != null) {
+ recordBuilder.setSourceQueueIdentifier(originalQueue.getIdentifier());
+ }
+
+ recordBuilder.setAttributes(repoRecord.getOriginalAttributes(), repoRecord.getUpdatedAttributes());
+ return recordBuilder.build();
+ }
+
+ private StandardProvenanceEventRecord enrich(final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, StandardRepositoryRecord> records, final boolean updateAttributes) {
+ final StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder().fromEvent(rawEvent);
+ final FlowFileRecord eventFlowFile = flowFileRecordMap.get(rawEvent.getFlowFileUuid());
+ if (eventFlowFile != null) {
+ final StandardRepositoryRecord repoRecord = records.get(eventFlowFile);
+
+ if (repoRecord.getCurrent() != null && repoRecord.getCurrentClaim() != null) {
+ final ContentClaim currentClaim = repoRecord.getCurrentClaim();
+ final long currentOffset = repoRecord.getCurrentClaimOffset();
+ final long size = eventFlowFile.getSize();
+ recordBuilder.setCurrentContentClaim(currentClaim.getContainer(), currentClaim.getSection(), currentClaim.getId(), currentOffset, size);
+ }
+
+ if (repoRecord.getOriginal() != null && repoRecord.getOriginalClaim() != null) {
+ final ContentClaim originalClaim = repoRecord.getOriginalClaim();
+ final long originalOffset = repoRecord.getOriginal().getContentClaimOffset();
+ final long originalSize = repoRecord.getOriginal().getSize();
+ recordBuilder.setPreviousContentClaim(originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), originalOffset, originalSize);
+ }
+
+ final FlowFileQueue originalQueue = repoRecord.getOriginalQueue();
+ if (originalQueue != null) {
+ recordBuilder.setSourceQueueIdentifier(originalQueue.getIdentifier());
+ }
+ }
+
+ if (updateAttributes) {
+ final FlowFileRecord flowFileRecord = flowFileRecordMap.get(rawEvent.getFlowFileUuid());
+ if (flowFileRecord != null) {
+ final StandardRepositoryRecord record = records.get(flowFileRecord);
+ if (record != null) {
+ recordBuilder.setAttributes(record.getOriginalAttributes(), record.getUpdatedAttributes());
+ }
+ }
+ }
+
+ return recordBuilder.build();
+ }
+
+ /**
+ * Checks if the given event is a spurious FORK, meaning that the FORK has a
+ * single child and that child was removed in this session. This happens
+ * when a Processor calls #create(FlowFile) and then removes the created
+ * FlowFile.
+ *
+ * @param event
+ * @return
+ */
+ private boolean isSpuriousForkEvent(final ProvenanceEventRecord event, final Set<String> removedFlowFiles) {
+ if (event.getEventType() == ProvenanceEventType.FORK) {
+ final List<String> childUuids = event.getChildUuids();
+ if (childUuids != null && childUuids.size() == 1 && removedFlowFiles.contains(childUuids.get(0))) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+
+ /**
+ * Checks if the given event is a spurious ROUTE, meaning that the ROUTE indicates that a FlowFile
+ * was routed to a relationship with only 1 connection and that Connection is the Connection from which
+ * the FlowFile was pulled. I.e., the FlowFile was really routed nowhere.
+ *
+ * @param event
+ * @param records
+ * @return
+ */
+ private boolean isSpuriousRouteEvent(final ProvenanceEventRecord event, final Map<FlowFileRecord, StandardRepositoryRecord> records) {
+ if ( event.getEventType() == ProvenanceEventType.ROUTE ) {
+ final String relationshipName = event.getRelationship();
+ final Relationship relationship = new Relationship.Builder().name(relationshipName).build();
+ final Collection<Connection> connectionsForRelationship = this.context.getConnections(relationship);
+
+ // If the number of connections for this relationship is not 1, then we can't ignore this ROUTE event,
+ // as it may be cloning the FlowFile and adding to multiple connections.
+ if ( connectionsForRelationship.size() == 1 ) {
+ for ( final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : records.entrySet() ) {
+ final FlowFileRecord flowFileRecord = entry.getKey();
+ if ( event.getFlowFileUuid().equals(flowFileRecord.getAttribute(CoreAttributes.UUID.key())) ) {
+ final StandardRepositoryRecord repoRecord = entry.getValue();
+ if ( repoRecord.getOriginalQueue() == null ) {
+ return false;
+ }
+
+ final String originalQueueId = repoRecord.getOriginalQueue().getIdentifier();
+ final Connection destinationConnection = connectionsForRelationship.iterator().next();
+ final String destinationQueueId = destinationConnection.getFlowFileQueue().getIdentifier();
+ return originalQueueId.equals(destinationQueueId);
+ }
+ }
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public void rollback() {
+ rollback(false);
+ }
+
+ @Override
+ public void rollback(final boolean penalize) {
+ deleteOnCommit.clear();
+ if (records.isEmpty()) {
+ LOG.trace("{} was rolled back, but no events were performed by this ProcessSession", this);
+ acknowledgeRecords();
+ return;
+ }
+
+ resetWriteClaims();
+ resetReadClaim();
+
+ for (final StandardRepositoryRecord record : records.values()) {
+ // remove the working claim if it's different than the original.
+ removeTemporaryClaim(record);
+ }
+
+ final Set<RepositoryRecord> abortedRecords = new HashSet<>();
+ final Set<StandardRepositoryRecord> transferRecords = new HashSet<>();
+ for (final StandardRepositoryRecord record : records.values()) {
+ if (record.isMarkedForAbort()) {
+ removeContent(record.getWorkingClaim());
+ if (record.getCurrentClaim() != null && !record.getCurrentClaim().equals(record.getWorkingClaim())) {
+ // if working & original claim are same, don't remove twice; we only want to remove the original
+ // if it's different from the working. Otherwise, we remove two claimant counts. This causes
+ // an issue if we only updated the flowfile attributes.
+ removeContent(record.getCurrentClaim());
+ }
+ abortedRecords.add(record);
+ } else {
+ transferRecords.add(record);
+ }
+ }
+
+ // Put the FlowFiles that are not marked for abort back to their original queues
+ for (final StandardRepositoryRecord record : transferRecords) {
+ if (record.getOriginal() != null) {
+ final FlowFileQueue originalQueue = record.getOriginalQueue();
+ if (originalQueue != null) {
+ if (penalize) {
+ final long expirationEpochMillis = System.currentTimeMillis() + context.getConnectable().getPenalizationPeriod(TimeUnit.MILLISECONDS);
+ final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getOriginal()).penaltyExpirationTime(expirationEpochMillis).build();
+ originalQueue.put(newFile);
+ } else {
+ originalQueue.put(record.getOriginal());
+ }
+ }
+ }
+ }
+
+ if (!abortedRecords.isEmpty()) {
+ try {
+ context.getFlowFileRepository().updateRepository(abortedRecords);
+ } catch (final IOException ioe) {
+ LOG.error("Unable to update FlowFile repository for aborted records due to {}", ioe.toString());
+ if (LOG.isDebugEnabled()) {
+ LOG.error("", ioe);
+ }
+ }
+ }
+
+ final Connectable connectable = context.getConnectable();
+ final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent(connectable.getIdentifier());
+ flowFileEvent.setBytesRead(bytesRead.getValue());
+ flowFileEvent.setBytesWritten(bytesWritten.getValue());
+
+ // update event repository
+ try {
+ context.getFlowFileEventRepository().updateRepository(flowFileEvent);
+ } catch (final Exception e) {
+ LOG.error("Failed to update FlowFileEvent Repository due to " + e);
+ if (LOG.isDebugEnabled()) {
+ LOG.error("", e);
+ }
+ }
+
+ acknowledgeRecords();
+ resetState();
+ }
+
+ private void removeContent(final ContentClaim claim) {
+ if (claim == null) {
+ return;
+ }
+
+ context.getContentRepository().decrementClaimantCount(claim);
+ }
+
+ /**
+ * Destroys a ContentClaim that was being written to but is no longer needed
+ *
+ * @param claim
+ */
+ private void destroyContent(final ContentClaim claim) {
+ if (claim == null) {
+ return;
+ }
+
+ final int decrementedClaimCount = context.getContentRepository().decrementClaimantCount(claim);
+ if (decrementedClaimCount <= 0) {
+ resetWriteClaims(); // Have to ensure that we are not currently writing to the claim before we can destroy it.
+ context.getContentRepository().remove(claim);
+ }
+ }
+
+ private void resetState() {
+ records.clear();
+ recursionSet.clear();
+ contentSizeIn = 0L;
+ contentSizeOut = 0L;
+ flowFilesIn = 0;
+ flowFilesOut = 0;
+ removedCount = 0;
+ removedBytes = 0L;
+ bytesRead.setValue(0L);
+ bytesWritten.setValue(0L);
+ connectionCounts.clear();
+ createdFlowFiles.clear();
+ removedFlowFiles.clear();
+ globalCounters.clear();
+ localCounters.clear();
+
+ generatedProvenanceEvents.clear();
+ forkEventBuilders.clear();
+ provenanceReporter.clear();
+
+ processingStartTime = System.nanoTime();
+ }
+
+ private void acknowledgeRecords() {
+ for (final Map.Entry<Connection, Set<FlowFileRecord>> entry : unacknowledgedFlowFiles.entrySet()) {
+ entry.getKey().getFlowFileQueue().acknowledge(entry.getValue());
+ }
+ unacknowledgedFlowFiles.clear();
+ }
+
+ private String summarizeEvents(final Checkpoint checkpoint) {
+ final Map<Relationship, Set<String>> transferMap = new HashMap<>(); // relationship to flowfile ID's
+ final Set<String> modifiedFlowFileIds = new HashSet<>();
+ int largestTransferSetSize = 0;
+
+ for (final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : checkpoint.records.entrySet()) {
+ final FlowFile flowFile = entry.getKey();
+ final StandardRepositoryRecord record = entry.getValue();
+
+ final Relationship relationship = record.getTransferRelationship();
+ if (Relationship.SELF.equals(relationship)) {
+ continue;
+ }
+
+ Set<String> transferIds = transferMap.get(relationship);
+ if (transferIds == null) {
+ transferIds = new HashSet<>();
+ transferMap.put(relationship, transferIds);
+ }
+ transferIds.add(flowFile.getAttribute(CoreAttributes.UUID.key()));
+ largestTransferSetSize = Math.max(largestTransferSetSize, transferIds.size());
+
+ final ContentClaim workingClaim = record.getWorkingClaim();
+ if (workingClaim != null && workingClaim != record.getOriginalClaim() && record.getTransferRelationship() != null) {
+ modifiedFlowFileIds.add(flowFile.getAttribute(CoreAttributes.UUID.key()));
+ }
+ }
+
+ final int numRemoved = checkpoint.removedFlowFiles.size();
+ final int numModified = modifiedFlowFileIds.size();
+ final int numCreated = checkpoint.createdFlowFiles.size();
+
+ final StringBuilder sb = new StringBuilder(512);
+ if (!LOG.isDebugEnabled() && (largestTransferSetSize > VERBOSE_LOG_THRESHOLD || numModified > VERBOSE_LOG_THRESHOLD || numCreated > VERBOSE_LOG_THRESHOLD || numRemoved > VERBOSE_LOG_THRESHOLD)) {
+ if (numCreated > 0) {
+ sb.append("created ").append(numCreated).append(" FlowFiles, ");
+ }
+ if (numModified > 0) {
+ sb.append("modified ").append(modifiedFlowFileIds.size()).append(" FlowFiles, ");
+ }
+ if (numRemoved > 0) {
+ sb.append("removed ").append(numRemoved).append(" FlowFiles, ");
+ }
+ for (final Map.Entry<Relationship, Set<String>> entry : transferMap.entrySet()) {
+ if (entry.getKey() != null) {
+ sb.append("Transferred ").append(entry.getValue().size()).append(" FlowFiles");
+
+ final Relationship relationship = entry.getKey();
+ if (relationship != Relationship.ANONYMOUS) {
+ sb.append(" to '").append(relationship.getName()).append("', ");
+ }
+ }
+ }
+ } else {
+ if (numCreated > 0) {
+ sb.append("created FlowFiles ").append(checkpoint.createdFlowFiles).append(", ");
+ }
+ if (numModified > 0) {
+ sb.append("modified FlowFiles ").append(modifiedFlowFileIds).append(", ");
+ }
+ if (numRemoved > 0) {
+ sb.append("removed FlowFiles ").append(checkpoint.removedFlowFiles).append(", ");
+ }
+ for (final Map.Entry<Relationship, Set<String>> entry : transferMap.entrySet()) {
+ if (entry.getKey() != null) {
+ sb.append("Transferred FlowFiles ").append(entry.getValue());
+
+ final Relationship relationship = entry.getKey();
+ if (relationship != Relationship.ANONYMOUS) {
+ sb.append(" to '").append(relationship.getName()).append("', ");
+ }
+ }
+ }
+ }
+
+ if (sb.length() > 2 && sb.subSequence(sb.length() - 2, sb.length()).equals(", ")) {
+ sb.delete(sb.length() - 2, sb.length());
+ }
+
+ // don't add processing time if we did nothing, because we don't log the summary anyway
+ if (sb.length() > 0) {
+ final long processingNanos = checkpoint.processingTime;
+ sb.append(", Processing Time = ");
+ formatNanos(processingNanos, sb);
+ }
+
+ return sb.toString();
+ }
+
+ private void formatNanos(final long nanos, final StringBuilder sb) {
+ final long seconds = (nanos > 1000000000L) ? (nanos / 1000000000L) : 0L;
+ long millis = (nanos > 1000000L) ? (nanos / 1000000L) : 0L;;
+ final long nanosLeft = nanos % 1000000L;
+
+ if (seconds > 0) {
+ sb.append(seconds).append(" seconds");
+ }
+ if (millis > 0) {
+ if (seconds > 0) {
+ sb.append(", ");
+ millis -= seconds * 1000L;
+ }
+
+ sb.append(millis).append(" millis");
+ }
+ if (seconds == 0 && millis == 0) {
+ sb.append(nanosLeft).append(" nanos");
+ }
+
+ sb.append(" (").append(nanos).append(" nanos)");
+ }
+
+ private void incrementConnectionInputCounts(final Connection connection, final RepositoryRecord record) {
+ StandardFlowFileEvent connectionEvent = connectionCounts.get(connection);
+ if (connectionEvent == null) {
+ connectionEvent = new StandardFlowFileEvent(connection.getIdentifier());
+ connectionCounts.put(connection, connectionEvent);
+ }
+ connectionEvent.setContentSizeIn(connectionEvent.getContentSizeIn() + record.getCurrent().getSize());
+ connectionEvent.setFlowFilesIn(connectionEvent.getFlowFilesIn() + 1);
+ }
+
+ private void incrementConnectionOutputCounts(final Connection connection, final FlowFileRecord record) {
+ StandardFlowFileEvent connectionEvent = connectionCounts.get(connection);
+ if (connectionEvent == null) {
+ connectionEvent = new StandardFlowFileEvent(connection.getIdentifier());
+ connectionCounts.put(connection, connectionEvent);
+ }
+ connectionEvent.setContentSizeOut(connectionEvent.getContentSizeOut() + record.getSize());
+ connectionEvent.setFlowFilesOut(connectionEvent.getFlowFilesOut() + 1);
+ }
+
+ private void registerDequeuedRecord(final FlowFileRecord flowFile, final Connection connection) {
+ final StandardRepositoryRecord record = new StandardRepositoryRecord(connection.getFlowFileQueue(), flowFile);
+ records.put(flowFile, record);
+ flowFilesIn++;
+ contentSizeIn += flowFile.getSize();
+
+ Set<FlowFileRecord> set = unacknowledgedFlowFiles.get(connection);
+ if (set == null) {
+ set = new HashSet<>();
+ unacknowledgedFlowFiles.put(connection, set);
+ }
+ set.add(flowFile);
+
+ incrementConnectionOutputCounts(connection, flowFile);
+ }
+
+ @Override
+ public void adjustCounter(final String name, final long delta, final boolean immediate) {
+ if (immediate) {
+ context.adjustCounter(name, delta);
+ return;
+ }
+
+ adjustCounter(name, delta, localCounters);
+ adjustCounter(name, delta, globalCounters);
+ }
+
+ private void adjustCounter(final String name, final long delta, final Map<String, Long> map) {
+ Long curVal = map.get(name);
+ if (curVal == null) {
+ curVal = Long.valueOf(0L);
+ }
+
+ final long newValue = curVal.longValue() + delta;
+ map.put(name, Long.valueOf(newValue));
+ }
+
+ @Override
+ public FlowFile get() {
+ final List<Connection> connections = context.getPollableConnections();
+ final int numConnections = connections.size();
+ for (int numAttempts = 0; numAttempts < numConnections; numAttempts++) {
+ final Connection conn = connections.get(context.getNextIncomingConnectionIndex() % connections.size());
+ final Set<FlowFileRecord> expired = new HashSet<>();
+ final FlowFileRecord flowFile = conn.getFlowFileQueue().poll(expired);
+ removeExpired(expired, conn);
+
+ if (flowFile != null) {
+ registerDequeuedRecord(flowFile, conn);
+ return flowFile;
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public List<FlowFile> get(final int maxResults) {
+ if (maxResults < 0) {
+ throw new IllegalArgumentException();
+ }
+ if (maxResults == 0) {
+ return Collections.emptyList();
+ }
+
+ return get(new QueuePoller() {
+ @Override
+ public List<FlowFileRecord> poll(final FlowFileQueue queue, final Set<FlowFileRecord> expiredRecords) {
+ return queue.poll(new FlowFileFilter() {
+ int polled = 0;
+
+ @Override
+ public FlowFileFilterResult filter(final FlowFile flowFile) {
+ if (++polled <= maxResults) {
+ return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+ } else {
+ return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
+ }
+ }
+ }, expiredRecords);
+ }
+ }, false);
+ }
+
+ @Override
+ public List<FlowFile> get(final FlowFileFilter filter) {
+ return get(new QueuePoller() {
+ @Override
+ public List<FlowFileRecord> poll(final FlowFileQueue queue, final Set<FlowFileRecord> expiredRecords) {
+ return queue.poll(filter, expiredRecords);
+ }
+ }, true);
+ }
+
+ private List<FlowFile> get(final QueuePoller poller, final boolean lockAllQueues) {
+ final List<Connection> connections = context.getPollableConnections();
+ if (lockAllQueues) {
+ for (final Connection connection : connections) {
+ connection.lock();
+ }
+ }
+
+ try {
+ for (final Connection conn : connections) {
+ final Set<FlowFileRecord> expired = new HashSet<>();
+ final List<FlowFileRecord> newlySelected = poller.poll(conn.getFlowFileQueue(), expired);
+ removeExpired(expired, conn);
+
+ if (newlySelected.isEmpty() && expired.isEmpty()) {
+ continue;
+ }
+
+ for (final FlowFileRecord flowFile : newlySelected) {
+ registerDequeuedRecord(flowFile, conn);
+ }
+
+ return new ArrayList<FlowFile>(newlySelected);
+ }
+
+ return new ArrayList<>();
+ } finally {
+ if (lockAllQueues) {
+ for (final Connection connection : connections) {
+ connection.unlock();
+ }
+ }
+ }
+ }
+
+ @Override
+ public QueueSize getQueueSize() {
+ int flowFileCount = 0;
+ long byteCount = 0L;
+ for (final Connection conn : context.getPollableConnections()) {
+ final QueueSize queueSize = conn.getFlowFileQueue().getActiveQueueSize();
+ flowFileCount += queueSize.getObjectCount();
+ byteCount += queueSize.getByteCount();
+ }
+ return new QueueSize(flowFileCount, byteCount);
+ }
+
+ @Override
- public Set<Relationship> getAvailableRelationships() {
- return context.getAvailableRelationships();
- }
-
- @Override
+ public FlowFile create() {
+ final Map<String, String> attrs = new HashMap<>();
+ attrs.put(CoreAttributes.FILENAME.key(), String.valueOf(System.nanoTime()));
+ attrs.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
+ attrs.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
+
+ final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
+ .addAttributes(attrs)
+ .build();
+ final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
+ record.setWorking(fFile, attrs);
+ records.put(fFile, record);
+ createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
+ return fFile;
+ }
+
+ @Override
+ public FlowFile clone(final FlowFile example) {
+ return clone(example, 0L, example.getSize());
+ }
+
+ @Override
+ public FlowFile clone(final FlowFile example, final long offset, final long size) {
+ validateRecordState(example);
+ final StandardRepositoryRecord exampleRepoRecord = records.get(example);
+ final FlowFileRecord currRec = exampleRepoRecord.getCurrent();
+ final ContentClaim claim = exampleRepoRecord.getCurrentClaim();
+ if (offset + size > example.getSize()) {
+ throw new FlowFileHandlingException("Specified offset of " + offset + " and size " + size + " exceeds size of " + example.toString());
+ }
+
+ final StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec);
+ builder.id(context.getNextFlowFileSequence());
+ builder.contentClaimOffset(currRec.getContentClaimOffset() + offset);
+ builder.size(size);
+
+ final String newUuid = UUID.randomUUID().toString();
+ builder.addAttribute(CoreAttributes.UUID.key(), newUuid);
+
+ final FlowFileRecord clone = builder.build();
+ if (claim != null) {
+ context.getContentRepository().incrementClaimaintCount(claim);
+ }
+ final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
+ record.setWorking(clone, CoreAttributes.UUID.key(), newUuid);
+ records.put(clone, record);
+
+ if (offset == 0L && size == example.getSize()) {
+ provenanceReporter.clone(example, clone);
+ } else {
+ registerForkEvent(example, clone);
+ }
+
+ return clone;
+ }
+
+ private void registerForkEvent(final FlowFile parent, final FlowFile child) {
+ ProvenanceEventBuilder eventBuilder = forkEventBuilders.get(parent);
+ if (eventBuilder == null) {
+ eventBuilder = context.getProvenanceRepository().eventBuilder();
+ eventBuilder.setEventType(ProvenanceEventType.FORK);
+
+ eventBuilder.setFlowFileEntryDate(parent.getEntryDate());
+ eventBuilder.setLineageIdentifiers(parent.getLineageIdentifiers());
+ eventBuilder.setLineageStartDate(parent.getLineageStartDate());
+ eventBuilder.setFlowFileUUID(parent.getAttribute(CoreAttributes.UUID.key()));
+
+ eventBuilder.setComponentId(context.getConnectable().getIdentifier());
+
+ final Connectable connectable = context.getConnectable();
+ final String processorType;
+ if (connectable instanceof ProcessorNode) {
+ processorType = ((ProcessorNode) connectable).getProcessor().getClass().getSimpleName();
+ } else {
+ processorType = connectable.getClass().getSimpleName();
+ }
+ eventBuilder.setComponentType(processorType);
+ eventBuilder.addParentFlowFile(parent);
+
+ updateEventContentClaims(eventBuilder, parent, records.get(parent));
+ forkEventBuilders.put(parent, eventBuilder);
+ }
+
+ eventBuilder.addChildFlowFile(child);
+ }
+
+ private void registerJoinEvent(final FlowFile child, final Collection<FlowFile> parents) {
+ final ProvenanceEventRecord eventRecord = provenanceReporter.generateJoinEvent(parents, child);
+ List<ProvenanceEventRecord> existingRecords = generatedProvenanceEvents.get(child);
+ if (existingRecords == null) {
+ existingRecords = new ArrayList<>();
+ generatedProvenanceEvents.put(child, existingRecords);
+ }
+ existingRecords.add(eventRecord);
+ }
+
+ @Override
+ public FlowFile penalize(final FlowFile flowFile) {
+ validateRecordState(flowFile);
+ final StandardRepositoryRecord record = records.get(flowFile);
+ final long expirationEpochMillis = System.currentTimeMillis() + context.getConnectable().getPenalizationPeriod(TimeUnit.MILLISECONDS);
+ final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).penaltyExpirationTime(expirationEpochMillis).build();
+ record.setWorking(newFile);
+ return newFile;
+ }
+
+ @Override
+ public FlowFile putAttribute(final FlowFile flowFile, final String key, final String value) {
+ validateRecordState(flowFile);
+
+ if (CoreAttributes.UUID.key().equals(key)) {
+ return flowFile;
+ }
+
+ final StandardRepositoryRecord record = records.get(flowFile);
+ final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).addAttribute(key, value).build();
+ record.setWorking(newFile, key, value);
+
+ return newFile;
+ }
+
+ @Override
+ public FlowFile putAllAttributes(final FlowFile flowFile, final Map<String, String> attributes) {
+ validateRecordState(flowFile);
+ final StandardRepositoryRecord record = records.get(flowFile);
+
+ final String originalUuid = flowFile.getAttribute(CoreAttributes.UUID.key());
+
+ final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).addAttributes(attributes);
+ // Ignore the uuid attribute, if passed in
+ ffBuilder.addAttribute(CoreAttributes.UUID.key(), originalUuid);
+ final FlowFileRecord newFile = ffBuilder.build();
+
+ record.setWorking(newFile, attributes);
+ return newFile;
+ }
+
+ @Override
+ public FlowFile removeAttribute(final FlowFile flowFile, final String key) {
+ validateRecordState(flowFile);
+
+ if (CoreAttributes.UUID.key().equals(key)) {
+ return flowFile;
+ }
+
+ final StandardRepositoryRecord record = records.get(flowFile);
+ final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(key).build();
+ record.setWorking(newFile, key, null);
+ return newFile;
+ }
+
+ @Override
+ public FlowFile removeAllAttributes(final FlowFile flowFile, final Set<String> keys) {
+ validateRecordState(flowFile);
+
+ if (keys == null) {
+ return flowFile;
+ }
+
+ final Set<String> keysToRemove;
+ if (keys.contains(CoreAttributes.UUID.key())) {
+ keysToRemove = new HashSet<>(keys);
+ keysToRemove.remove(CoreAttributes.UUID.key());
+ } else {
+ keysToRemove = keys;
+ }
+
+ final StandardRepositoryRecord record = records.get(flowFile);
+ final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(keys).build();
+
+ final Map<String, String> updatedAttrs = new HashMap<>();
+ for (final String key : keys) {
+ updatedAttrs.put(key, null);
+ }
+
+ record.setWorking(newFile, updatedAttrs);
+ return newFile;
+ }
+
+ @Override
+ public FlowFile removeAllAttributes(final FlowFile flowFile, final Pattern keyPattern) {
+ validateRecordState(flowFile);
+ final StandardRepositoryRecord record = records.get(flowFile);
+ final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(keyPattern).build();
+
+ if (keyPattern == null) {
+ record.setWorking(newFile);
+ } else {
+ final Map<String, String> curAttrs = record.getCurrent().getAttributes();
+
+ final Map<String, String> removed = new HashMap<>();
+ for (final String key : curAttrs.keySet()) {
+ if (CoreAttributes.UUID.key().equals(key)) {
+ continue;
+ }
+
+ if (keyPattern.matcher(key).matches()) {
+ removed.put(key, null);
+ }
+ }
+
+ record.setWorking(newFile, removed);
+ }
+
+ return newFile;
+ }
+
+ @Override
+ public void transfer(final FlowFile flowFile, final Relationship relationship) {
+ validateRecordState(flowFile);
+ final StandardRepositoryRecord record = records.get(flowFile);
+ record.setTransferRelationship(relationship);
+ final int numDestinations = context.getConnections(relationship).size();
+ final int multiplier = Math.max(1, numDestinations);
+
+ boolean autoTerminated = false;
+ boolean selfRelationship = false;
+ if (numDestinations == 0 && context.getConnectable().isAutoTerminated(relationship)) {
+ // auto terminated.
+ autoTerminated = true;
+ } else if (numDestinations == 0 && relationship == Relationship.SELF) {
+ selfRelationship = true;
+ }
+
+ if (autoTerminated) {
+ removedCount += multiplier;
+ removedBytes += flowFile.getSize();
+ } else if (!selfRelationship) {
+ flowFilesOut += multiplier;
+ contentSizeOut += flowFile.getSize() * multiplier;
+ }
+ }
+
+ @Override
+ public void transfer(final FlowFile flowFile) {
+ validateRecordState(flowFile);
+ final StandardRepositoryRecord record = records.get(flowFile);
+ if (record.getOriginalQueue() == null) {
+ throw new IllegalArgumentException("Cannot transfer FlowFiles that are created in this Session back to self");
+ }
+ record.setTransferRelationship(Relationship.SELF);
+ }
+
+ @Override
+ public void transfer(final Collection<FlowFile> flowFiles) {
+ for (final FlowFile flowFile : flowFiles) {
+ transfer(flowFile);
+ }
+ }
+
+ @Override
+ public void transfer(final Collection<FlowFile> flowFiles, final Relationship relationship) {
+ validateRecordState(flowFiles);
+
+ boolean autoTerminated = false;
+ boolean selfRelationship = false;
+ final int numDestinations = context.getConnections(relationship).size();
+ if (numDestinations == 0 && context.getConnectable().isAutoTerminated(relationship)) {
+ // auto terminated.
+ autoTerminated = true;
+ } else if (numDestinations == 0 && relationship == Relationship.SELF) {
+ selfRelationship = true;
+ }
+
+ final int multiplier = Math.max(1, numDestinations);
+
+ long contentSize = 0L;
+ for (final FlowFile flowFile : flowFiles) {
+ final StandardRepositoryRecord record = records.get(flowFile);
+ record.setTransferRelationship(relationship);
+ contentSize += flowFile.getSize() * multiplier;
+ }
+
+ if (autoTerminated) {
+ removedCount += multiplier * flowFiles.size();
+ removedBytes += contentSize;
+ } else if (!selfRelationship) {
+ flowFilesOut += multiplier * flowFiles.size();
+ contentSizeOut += multiplier * contentSize;
+ }
+ }
+
+ @Override
+ public void remove(final FlowFile flowFile) {
+ validateRecordState(flowFile);
+ final StandardRepositoryRecord record = records.get(flowFile);
+ record.markForDelete();
+ removedFlowFiles.add(flowFile.getAttribute(CoreAttributes.UUID.key()));
+
+ // if original connection is null, the FlowFile was created in this session, so we
+ // do not want to count it toward the removed count.
+ if (record.getOriginalQueue() == null) {
+ // if we've generated any Fork events, remove them because the FlowFile was created
+ // and then removed in this session.
+ generatedProvenanceEvents.remove(flowFile);
+ removeForkEvents(flowFile);
+ } else {
+ removedCount++;
+ removedBytes += flowFile.getSize();
+ provenanceReporter.drop(flowFile, flowFile.getAttribute(CoreAttributes.DISCARD_REASON.key()));
+ }
+ }
+
+ @Override
+ public void remove(final Collection<FlowFile> flowFiles) {
+ validateRecordState(flowFiles);
+ for (final FlowFile flowFile : flowFiles) {
+ final StandardRepositoryRecord record = records.get(flowFile);
+ record.markForDelete();
+ removedFlowFiles.add(flowFile.getAttribute(CoreAttributes.UUID.key()));
+
+ // if original connection is null, the FlowFile was created in this session, so we
+ // do not want to count it toward the removed count.
+ if (record.getOriginalQueue() == null) {
+ generatedProvenanceEvents.remove(flowFile);
+ removeForkEvents(flowFile);
+ } else {
+ removedCount++;
+ removedBytes += flowFile.getSize();
+ provenanceReporter.drop(flowFile, flowFile.getAttribute(CoreAttributes.DISCARD_REASON.key()));
+ }
+ }
+ }
+
+ private void removeForkEvents(final FlowFile flowFile) {
+ for (final ProvenanceEventBuilder builder : forkEventBuilders.values()) {
+ final ProvenanceEventRecord event = builder.build();
+
+ if (event.getEventType() == ProvenanceEventType.FORK) {
+ builder.removeChildFlowFile(flowFile);
+ }
+ }
+ }
+
+ public void expireFlowFiles() {
+ final Set<FlowFileRecord> expired = new HashSet<>();
+ final FlowFileFilter filter = new FlowFileFilter() {
+ @Override
+ public FlowFileFilterResult filter(final FlowFile flowFile) {
+ return FlowFileFilterResult.REJECT_AND_CONTINUE;
+ }
+ };
+
+ for (final Connection conn : context.getConnectable().getIncomingConnections()) {
+ do {
+ expired.clear();
+ conn.getFlowFileQueue().poll(filter, expired);
+ removeExpired(expired, conn);
+ } while (!expired.isEmpty());
+ }
+ }
+
+ private void removeExpired(final Set<FlowFileRecord> flowFiles, final Connection connection) {
+ if (flowFiles.isEmpty()) {
+ return;
+ }
+
+ LOG.info("{} {} FlowFiles have expired and will be removed", new Object[]{this, flowFiles.size()});
+ final List<RepositoryRecord> expiredRecords = new ArrayList<>(flowFiles.size());
+
+ final String processorType;
+ final Connectable connectable = context.getConnectable();
+ if (connectable instanceof ProcessorNode) {
+ final ProcessorNode procNode = (ProcessorNode) connectable;
+ processorType = procNode.getProcessor().getClass().getSimpleName();
+ } else {
+ processorType = connectable.getClass().getSimpleName();
+ }
+
+ final StandardProvenanceReporter expiredReporter = new StandardProvenanceReporter(connectable.getIdentifier(),
+ processorType, context.getProvenanceRepository(), this);
+
+ final Map<String, FlowFileRecord> recordIdMap = new HashMap<>();
+ for (final FlowFileRecord flowFile : flowFiles) {
+ recordIdMap.put(flowFile.getAttribute(CoreAttributes.UUID.key()), flowFile);
+
+ final StandardRepositoryRecord record = new StandardRepositoryRecord(connection.getFlowFileQueue(), flowFile);
+ record.markForDelete();
+ expiredRecords.add(record);
+ expiredReporter.expire(flowFile, "Expiration Threshold = " + connection.getFlowFileQueue().getFlowFileExpiration());
+ removeContent(flowFile.getContentClaim());
+
+ final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
+ final Object terminator = (connectable instanceof ProcessorNode) ? ((ProcessorNode) connectable).getProcessor() : connectable;
+ LOG.info("{} terminated by {} due to FlowFile expiration; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife});
+ }
+
+ try {
+ final Iterable<ProvenanceEventRecord> iterable = new Iterable<ProvenanceEventRecord>() {
+ @Override
+ public Iterator<ProvenanceEventRecord> iterator() {
+ final Iterator<ProvenanceEventRecord> expiredEventIterator = expiredReporter.getEvents().iterator();
+ final Iterator<ProvenanceEventRecord> enrichingIterator = new Iterator<ProvenanceEventRecord>() {
+ @Override
+ public boolean hasNext() {
+ return expiredEventIterator.hasNext();
+ }
+
+ @Override
+ public ProvenanceEventRecord next() {
+ final ProvenanceEventRecord event = expiredEventIterator.next();
+ final StandardProvenanceEventRecord.Builder enriched = new StandardProvenanceEventRecord.Builder().fromEvent(event);
+ final FlowFileRecord record = recordIdMap.get(event.getFlowFileUuid());
+ if (record == null) {
+ return null;
+ }
+
+ final ContentClaim claim = record.getContentClaim();
+ if (claim != null) {
+ enriched.setCurrentContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), record.getContentClaimOffset(), record.getSize());
+ enriched.setPreviousContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), record.getContentClaimOffset(), record.getSize());
+ }
+
+ enriched.setAttributes(record.getAttributes(), Collections.<String, String>emptyMap());
+ return enriched.build();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ return enrichingIterator;
+ }
+ };
+
+ context.getProvenanceRepository().registerEvents(iterable);
+ context.getFlowFileRepository().updateRepository(expiredRecords);
+ } catch (final IOException e) {
+ LOG.error("Failed to update FlowFile Repository to record expired records due to {}", e);
+ }
+
+ }
+
+ private InputStream getInputStream(final FlowFile flowFile, final ContentClaim claim, final long offset) throws ContentNotFoundException {
+ // If there's no content, don't bother going to the Content Repository because it is generally expensive and we know
+ // that there is no actual content.
+ if (flowFile.getSize() == 0L) {
+ return new ByteArrayInputStream(new byte[0]);
+ }
+
+ try {
+ // If the recursion set is empty, we can use the same input stream that we already have open. However, if
+ // the recursion set is NOT empty, we can't do this because we may be reading the input of FlowFile 1 while in the
+ // callback for reading FlowFile 1 and if we used the same stream we'd be destroying the ability to read from FlowFile 1.
+ if (recursionSet.isEmpty()) {
+ if (currentReadClaim == claim) {
+ if (currentReadClaimStream != null && currentReadClaimStream.getStreamLocation() <= offset) {
+ final long bytesToSkip = offset - currentReadClaimStream.getStreamLocation();
+ if (bytesToSkip > 0) {
+ StreamUtils.skip(currentReadClaimStream, bytesToSkip);
+ }
+
+ return new NonCloseableInputStream(currentReadClaimStream);
+ }
+ }
+
+ final InputStream rawInStream = context.getContentRepository().read(claim);
+
+ if (currentReadClaimStream != null) {
+ currentReadClaimStream.close();
+ }
+
+ currentReadClaim = claim;
+ currentReadClaimStream = new ByteCountingInputStream(rawInStream, new LongHolder(0L));
+ StreamUtils.skip(currentReadClaimStream, offset);
+
+ // Use a non-closeable stream because we want to keep it open after the callback has finished so that we can
+ // reuse the same InputStream for the next FlowFile
+ return new NonCloseableInputStream(currentReadClaimStream);
+ } else {
+ final InputStream rawInStream = context.getContentRepository().read(claim);
+ StreamUtils.skip(rawInStream, offset);
+ return rawInStream;
+ }
+ } catch (final ContentNotFoundException cnfe) {
+ throw cnfe;
+ } catch (final EOFException eof) {
+ throw new ContentNotFoundException(claim, eof);
+ } catch (final IOException ioe) {
+ throw new FlowFileAccessException("Failed to read content of " + flowFile, ioe);
+ }
+ }
+
+ @Override
+ public void read(final FlowFile source, final InputStreamCallback reader) {
+ validateRecordState(source);
+ final StandardRepositoryRecord record = records.get(source);
+
+ try {
+ ensureNotAppending(record.getCurrentClaim());
+ } catch (final IOException e) {
+ throw new FlowFileAccessException("Failed to access ContentClaim for " + source.toString(), e);
+ }
+
+ try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset());
+ final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
+ final ByteCountingInputStream countingStream = new ByteCountingInputStream(limitedIn, this.bytesRead)) {
+
+ // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from
+ // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository
+ // and translates into either FlowFileAccessException or ContentNotFoundException. We keep track of any
+ // ContentNotFoundException because if it is thrown, the Processor code may catch it and do something else with it
+ // but in reality, if it is thrown, we want to know about it and handle it, even if the Processor code catches it.
+ final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim());
+ boolean cnfeThrown = false;
+
+ try {
+ recursionSet.add(source);
+ reader.process(ffais);
+ } catch (final ContentNotFoundException cnfe) {
+ cnfeThrown = true;
+ throw cnfe;
+ } finally {
+ recursionSet.remove(source);
+
+ // if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate.
+ if (!cnfeThrown && ffais.getContentNotFoundException() != null) {
+ throw ffais.getContentNotFoundException();
+ }
+ }
+ } catch (final ContentNotFoundException nfe) {
+ handleContentNotFound(nfe, record);
+ } catch (final IOException ex) {
+ throw new ProcessException("IOException thrown from " + connectableDescription + ": " + ex.toString(), ex);
+ }
+ }
+
+ @Override
+ public FlowFile merge(final Collection<FlowFile> sources, final FlowFile destination) {
+ return merge(sources, destination, null, null, null);
+ }
+
+ @Override
+ public FlowFile merge(final Collection<FlowFile> sources, final FlowFile destination, final byte[] header, final byte[] footer, final byte[] demarcator) {
+ validateRecordState(sources);
+ validateRecordState(destination);
+ if (sources.contains(destination)) {
+ throw new IllegalArgumentException("Destination cannot be within sources");
+ }
+
+ final Collection<StandardRepositoryRecord> sourceRecords = new ArrayList<>();
+ for (final FlowFile source : sources) {
+ final StandardRepositoryRecord record = records.get(source);
+ sourceRecords.add(record);
+
+ try {
+ ensureNotAppending(record.getCurrentClaim());
+ } catch (final IOException e) {
+ throw new FlowFileAccessException("Unable to read from source " + source + " due to " + e.toString(), e);
+ }
+ }
+
+ final StandardRepositoryRecord destinationRecord = records.get(destination);
+ final ContentRepository contentRepo = context.getContentRepository();
+ final ContentClaim newClaim;
+ try {
+ newClaim = contentRepo.create(context.getConnectable().isLossTolerant());
+ claimLog.debug("Creating ContentClaim {} for 'merge' for {}", newClaim, desti
<TRUNCATED>