You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/01/19 19:15:40 UTC

[06/59] [abbrv] [partial] incubator-nifi git commit: Reworked overall directory structure to make releasing nifi vs maven plugis easier

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java
deleted file mode 100644
index f36a459..0000000
--- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.nifi.connectable.Connectable;
-import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.util.Connectables;
-
-public class EventDrivenWorkerQueue implements WorkerQueue {
-
-    private final Object workMonitor = new Object();
-
-    private final Map<Connectable, Worker> workerMap = new HashMap<>();   // protected by synchronizing on workMonitor
-    private final WorkerReadyQueue workerQueue;
-
-    public EventDrivenWorkerQueue(final boolean clustered, final boolean primary, final ProcessScheduler scheduler) {
-        workerQueue = new WorkerReadyQueue(scheduler);
-        workerQueue.setClustered(clustered);
-        workerQueue.setPrimary(primary);
-    }
-
-    @Override
-    public void setClustered(final boolean clustered) {
-        workerQueue.setClustered(clustered);
-    }
-
-    @Override
-    public void setPrimary(final boolean primary) {
-        workerQueue.setPrimary(primary);
-    }
-
-    @Override
-    public Worker poll(final long timeout, final TimeUnit timeUnit) {
-        final long maxTime = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(timeout, timeUnit);
-        while (System.currentTimeMillis() < maxTime) {
-            synchronized (workMonitor) {
-                final Worker worker = workerQueue.poll();
-                if (worker == null) {
-                    // nothing to do. wait until we have something to do.
-                    final long timeLeft = maxTime - System.currentTimeMillis();
-                    if (timeLeft <= 0) {
-                        return null;
-                    }
-
-                    try {
-                        workMonitor.wait(timeLeft);
-                    } catch (final InterruptedException ignored) {
-                    }
-                } else {
-                    // Decrement the amount of work there is to do for this worker.
-                    final int workLeft = worker.decrementEventCount();
-                    if (workLeft > 0) {
-                        workerQueue.offer(worker);
-                    }
-
-                    return worker;
-                }
-            }
-        }
-
-        return null;
-    }
-
-    @Override
-    public void offer(final Connectable connectable) {
-        synchronized (workMonitor) {
-            Worker worker = workerMap.get(connectable);
-            if (worker == null) {
-                // if worker is null, then it has not been scheduled to run; ignore the event.
-                return;
-            }
-
-            final int countBefore = worker.incrementEventCount();
-            if (countBefore < 0) {
-                worker.setWorkCount(1);
-            }
-            if (countBefore <= 0) {
-                // If countBefore > 0 then it's already on the queue, so just incrementing its counter is sufficient.
-                workerQueue.offer(worker);
-            }
-
-            workMonitor.notify();
-        }
-    }
-
-    private int getWorkCount(final Connectable connectable) {
-        int sum = 0;
-        for (final Connection connection : connectable.getIncomingConnections()) {
-            sum += connection.getFlowFileQueue().size().getObjectCount();
-        }
-        return sum;
-    }
-
-    @Override
-    public void resumeWork(final Connectable connectable) {
-        synchronized (workMonitor) {
-            final int workCount = getWorkCount(connectable);
-            final Worker worker = new Worker(connectable);
-            workerMap.put(connectable, worker);
-
-            if (workCount > 0) {
-                worker.setWorkCount(workCount);
-                workerQueue.offer(worker);
-                workMonitor.notify();
-            }
-        }
-    }
-
-    @Override
-    public void suspendWork(final Connectable connectable) {
-        synchronized (workMonitor) {
-            final Worker worker = this.workerMap.remove(connectable);
-            if (worker == null) {
-                return;
-            }
-
-            worker.resetWorkCount();
-            workerQueue.remove(worker);
-        }
-    }
-
-    public static class Worker implements EventBasedWorker {
-
-        private final Connectable connectable;
-        private final AtomicInteger workCount = new AtomicInteger(0);
-
-        public Worker(final Connectable connectable) {
-            this.connectable = connectable;
-        }
-
-        @Override
-        public Connectable getConnectable() {
-            return connectable;
-        }
-
-        @Override
-        public int decrementEventCount() {
-            return workCount.decrementAndGet();
-        }
-
-        @Override
-        public int incrementEventCount() {
-            return workCount.getAndIncrement();
-        }
-
-        void resetWorkCount() {
-            workCount.set(0);
-        }
-
-        void setWorkCount(final int workCount) {
-            this.workCount.set(workCount);
-        }
-    }
-
-    @SuppressWarnings("serial")
-    private static class WorkerReadyQueue extends LinkedList<Worker> {
-
-        private final ProcessScheduler scheduler;
-
-        private volatile boolean clustered = false;
-        private volatile boolean primary = false;
-
-        public WorkerReadyQueue(final ProcessScheduler scheduler) {
-            this.scheduler = scheduler;
-        }
-
-        public void setClustered(final boolean clustered) {
-            this.clustered = clustered;
-        }
-
-        public void setPrimary(final boolean primary) {
-            this.primary = primary;
-        }
-
-        @Override
-        public Worker poll() {
-            final List<Worker> putBack = new ArrayList<>();
-
-            Worker worker;
-            try {
-                while ((worker = super.poll()) != null) {
-                    final DelayProcessingReason reason = getDelayReason(worker);
-                    if (reason == null) {
-                        return worker;
-                    } else {
-                        // Worker is not ready. We may want to add him back to the queue, depending on the reason that he is unready.
-                        switch (reason) {
-                            case YIELDED:
-                            case ISOLATED:
-                            case DESTINATION_FULL:
-                            case ALL_WORK_PENALIZED:
-                            case NO_WORK:
-                            case TOO_MANY_THREADS:
-                                // there will not be an event that triggers this to happen, so we add this worker back to the queue.
-                                putBack.add(worker);
-                                break;
-                            default:
-                            case NOT_RUNNING:
-                                // There's no need to check if this worker is available again until a another event
-                                // occurs. Therefore, we keep him off of the queue and reset his work count
-                                worker.resetWorkCount();
-                                break;
-                        }
-                    }
-                }
-            } finally {
-                if (!putBack.isEmpty()) {
-                    super.addAll(putBack);
-                }
-            }
-
-            return null;
-        }
-
-        private DelayProcessingReason getDelayReason(final Worker worker) {
-            final Connectable connectable = worker.getConnectable();
-
-            if (ScheduledState.RUNNING != connectable.getScheduledState()) {
-                return DelayProcessingReason.NOT_RUNNING;
-            }
-
-            if (connectable.getYieldExpiration() > System.currentTimeMillis()) {
-                return DelayProcessingReason.YIELDED;
-            }
-
-            // For Remote Output Ports,
-            int availableRelationshipCount = 0;
-            if (!connectable.getRelationships().isEmpty()) {
-                availableRelationshipCount = getAvailableRelationshipCount(connectable);
-
-                if (availableRelationshipCount == 0) {
-                    return DelayProcessingReason.DESTINATION_FULL;
-                }
-            }
-
-            if (connectable.hasIncomingConnection() && !Connectables.flowFilesQueued(connectable)) {
-                return DelayProcessingReason.NO_WORK;
-            }
-
-            final int activeThreadCount = scheduler.getActiveThreadCount(worker.getConnectable());
-            final int maxThreadCount = worker.getConnectable().getMaxConcurrentTasks();
-            if (maxThreadCount > 0 && activeThreadCount >= maxThreadCount) {
-                return DelayProcessingReason.TOO_MANY_THREADS;
-            }
-
-            if (connectable instanceof ProcessorNode) {
-                final ProcessorNode procNode = (ProcessorNode) connectable;
-                if (procNode.isIsolated() && clustered && !primary) {
-                    return DelayProcessingReason.ISOLATED;
-                }
-
-                final boolean triggerWhenAnyAvailable = procNode.isTriggerWhenAnyDestinationAvailable();
-                final boolean allDestinationsAvailable = availableRelationshipCount == procNode.getRelationships().size();
-                if (!triggerWhenAnyAvailable && !allDestinationsAvailable) {
-                    return DelayProcessingReason.DESTINATION_FULL;
-                }
-            }
-
-            return null;
-        }
-
-        private int getAvailableRelationshipCount(final Connectable connectable) {
-            int count = 0;
-            for (final Relationship relationship : connectable.getRelationships()) {
-                final Collection<Connection> connections = connectable.getConnections(relationship);
-
-                if (connections == null || connections.isEmpty()) {
-                    if (connectable.isAutoTerminated(relationship)) {
-                        // If the relationship is auto-terminated, consider it available.
-                        count++;
-                    }
-                } else {
-                    boolean available = true;
-                    for (final Connection connection : connections) {
-                        if (connection.getSource() == connection.getDestination()) {
-                            // don't count self-loops
-                            continue;
-                        }
-
-                        if (connection.getFlowFileQueue().isFull()) {
-                            available = false;
-                        }
-                    }
-
-                    if (available) {
-                        count++;
-                    }
-                }
-            }
-
-            return count;
-        }
-    }
-
-    private static enum DelayProcessingReason {
-
-        YIELDED,
-        DESTINATION_FULL,
-        NO_WORK,
-        ALL_WORK_PENALIZED,
-        ISOLATED,
-        NOT_RUNNING,
-        TOO_MANY_THREADS;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
deleted file mode 100644
index e1d80b0..0000000
--- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ /dev/null
@@ -1,768 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.regex.Pattern;
-
-import org.apache.nifi.controller.repository.ConnectionSwapInfo;
-import org.apache.nifi.controller.repository.FlowFileRecord;
-import org.apache.nifi.controller.repository.FlowFileRepository;
-import org.apache.nifi.controller.repository.FlowFileSwapManager;
-import org.apache.nifi.controller.repository.QueueProvider;
-import org.apache.nifi.controller.repository.StandardFlowFileRecord;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ContentClaimManager;
-import org.apache.nifi.engine.FlowEngine;
-import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.processor.QueueSize;
-import org.apache.nifi.reporting.Severity;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * <p>
- * An implementation of the {@link FlowFileSwapManager} that swaps FlowFiles
- * to/from local disk
- * </p>
- */
-public class FileSystemSwapManager implements FlowFileSwapManager {
-
-    public static final int MINIMUM_SWAP_COUNT = 10000;
-    private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap");
-    private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap\\.part");
-    
-    public static final int SWAP_ENCODING_VERSION = 6;
-    public static final String EVENT_CATEGORY = "Swap FlowFiles";
-
-    private final ScheduledExecutorService swapQueueIdentifierExecutor;
-    private final ScheduledExecutorService swapInExecutor;
-    private volatile FlowFileRepository flowFileRepository;
-    private volatile EventReporter eventReporter;
-
-    // Maintains a mapping of FlowFile Queue to the a QueueLockWrapper, which provides queue locking and necessary state for swapping back in
-    private final ConcurrentMap<FlowFileQueue, QueueLockWrapper> swapMap = new ConcurrentHashMap<>();
-    private final File storageDirectory;
-    private final long swapInMillis;
-    private final long swapOutMillis;
-    private final int swapOutThreadCount;
-
-    private ContentClaimManager claimManager;	// effectively final
-
-    private static final Logger logger = LoggerFactory.getLogger(FileSystemSwapManager.class);
-
-    public FileSystemSwapManager() {
-        final NiFiProperties properties = NiFiProperties.getInstance();
-        final Path flowFileRepoPath = properties.getFlowFileRepositoryPath();
-        
-        this.storageDirectory = flowFileRepoPath.resolve("swap").toFile();
-        if (!storageDirectory.exists() && !storageDirectory.mkdirs()) {
-            throw new RuntimeException("Cannot create Swap Storage directory " + storageDirectory.getAbsolutePath());
-        }
-
-        swapQueueIdentifierExecutor = new FlowEngine(1, "Identifies Queues for FlowFile Swapping");
-
-        swapInMillis = FormatUtils.getTimeDuration(properties.getSwapInPeriod(), TimeUnit.MILLISECONDS);
-        swapOutMillis = FormatUtils.getTimeDuration(properties.getSwapOutPeriod(), TimeUnit.MILLISECONDS);
-        swapOutThreadCount = properties.getSwapOutThreads();
-        swapInExecutor = new FlowEngine(properties.getSwapInThreads(), "Swap In FlowFiles");
-    }
-
-    @Override
-    public void purge() {
-        final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() {
-            @Override
-            public boolean accept(final File dir, final String name) {
-                return SWAP_FILE_PATTERN.matcher(name).matches();
-            }
-        });
-
-        if (swapFiles != null) {
-            for (final File file : swapFiles) {
-                if (!file.delete() && file.exists()) {
-                    logger.warn("Failed to delete SWAP file {}", file);
-                }
-            }
-        }
-    }
-
-    public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ContentClaimManager claimManager, final EventReporter eventReporter) {
-        this.claimManager = claimManager;
-        this.flowFileRepository = flowFileRepository;
-        this.eventReporter = eventReporter;
-        swapQueueIdentifierExecutor.scheduleWithFixedDelay(new QueueIdentifier(connectionProvider), swapOutMillis, swapOutMillis, TimeUnit.MILLISECONDS);
-        swapInExecutor.scheduleWithFixedDelay(new SwapInTask(), swapInMillis, swapInMillis, TimeUnit.MILLISECONDS);
-    }
-
-    public int serializeFlowFiles(final List<FlowFileRecord> toSwap, final FlowFileQueue queue, final String swapLocation, final OutputStream destination) throws IOException {
-        if (toSwap == null || toSwap.isEmpty()) {
-            return 0;
-        }
-
-        long contentSize = 0L;
-        for (final FlowFileRecord record : toSwap) {
-            contentSize += record.getSize();
-        }
-
-        // persist record to disk via the swap file
-        final OutputStream bufferedOut = new BufferedOutputStream(destination);
-        final DataOutputStream out = new DataOutputStream(bufferedOut);
-        try {
-            out.writeInt(SWAP_ENCODING_VERSION);
-            out.writeUTF(queue.getIdentifier());
-            out.writeInt(toSwap.size());
-            out.writeLong(contentSize);
-
-            for (final FlowFileRecord flowFile : toSwap) {
-                out.writeLong(flowFile.getId());
-                out.writeLong(flowFile.getEntryDate());
-
-                final Set<String> lineageIdentifiers = flowFile.getLineageIdentifiers();
-                out.writeInt(lineageIdentifiers.size());
-                for (final String lineageId : lineageIdentifiers) {
-                    out.writeUTF(lineageId);
-                }
-
-                out.writeLong(flowFile.getLineageStartDate());
-                out.writeLong(flowFile.getLastQueueDate());
-                out.writeLong(flowFile.getSize());
-
-                final ContentClaim claim = flowFile.getContentClaim();
-                if (claim == null) {
-                    out.writeBoolean(false);
-                } else {
-                    out.writeBoolean(true);
-                    out.writeUTF(claim.getId());
-                    out.writeUTF(claim.getContainer());
-                    out.writeUTF(claim.getSection());
-                    out.writeLong(flowFile.getContentClaimOffset());
-                    out.writeBoolean(claim.isLossTolerant());
-                }
-
-                final Map<String, String> attributes = flowFile.getAttributes();
-                out.writeInt(attributes.size());
-                for (final Map.Entry<String, String> entry : attributes.entrySet()) {
-                    writeString(entry.getKey(), out);
-                    writeString(entry.getValue(), out);
-                }
-            }
-        } finally {
-            out.flush();
-        }
-
-        logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", new Object[]{toSwap.size(), queue, swapLocation});
-
-        return toSwap.size();
-    }
-
-    private void writeString(final String toWrite, final OutputStream out) throws IOException {
-        final byte[] bytes = toWrite.getBytes("UTF-8");
-        final int utflen = bytes.length;
-
-        if (utflen < 65535) {
-            out.write(utflen >>> 8);
-            out.write(utflen);
-            out.write(bytes);
-        } else {
-            out.write(255);
-            out.write(255);
-            out.write(utflen >>> 24);
-            out.write(utflen >>> 16);
-            out.write(utflen >>> 8);
-            out.write(utflen);
-            out.write(bytes);
-        }
-    }
-
-    static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final FlowFileQueue queue, final ContentClaimManager claimManager) throws IOException {
-        final int swapEncodingVersion = in.readInt();
-        if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
-            throw new IOException("Cannot swap FlowFiles in from SwapFile because the encoding version is "
-                    + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)");
-        }
-
-        final String connectionId = in.readUTF();
-        if (!connectionId.equals(queue.getIdentifier())) {
-            throw new IllegalArgumentException("Cannot restore Swap File because the file indicates that records belong to Connection with ID " + connectionId + " but received Connection " + queue);
-        }
-
-        final int numRecords = in.readInt();
-        in.readLong();  // Content Size
-
-        return deserializeFlowFiles(in, numRecords, queue, swapEncodingVersion, false, claimManager);
-    }
-
-    static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final int numFlowFiles, final FlowFileQueue queue, final int serializationVersion, final boolean incrementContentClaims, final ContentClaimManager claimManager) throws IOException {
-        final List<FlowFileRecord> flowFiles = new ArrayList<>();
-        for (int i = 0; i < numFlowFiles; i++) {
-            // legacy encoding had an "action" because it used to be couple with FlowFile Repository code
-            if (serializationVersion < 3) {
-                final int action = in.read();
-                if (action != 1) {
-                    throw new IOException("Swap File is version " + serializationVersion + " but did not contain a 'UPDATE' record type");
-                }
-            }
-
-            final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
-            ffBuilder.id(in.readLong());
-            ffBuilder.entryDate(in.readLong());
-
-            if (serializationVersion > 1) {
-                // Lineage information was added in version 2
-                final int numLineageIdentifiers = in.readInt();
-                final Set<String> lineageIdentifiers = new HashSet<>(numLineageIdentifiers);
-                for (int lineageIdIdx = 0; lineageIdIdx < numLineageIdentifiers; lineageIdIdx++) {
-                    lineageIdentifiers.add(in.readUTF());
-                }
-                ffBuilder.lineageIdentifiers(lineageIdentifiers);
-                ffBuilder.lineageStartDate(in.readLong());
-
-                if (serializationVersion > 5) {
-                    ffBuilder.lastQueueDate(in.readLong());
-                }
-            }
-
-            ffBuilder.size(in.readLong());
-
-            if (serializationVersion < 3) {
-                readString(in); // connection Id
-            }
-
-            final boolean hasClaim = in.readBoolean();
-            if (hasClaim) {
-                final String claimId;
-                if (serializationVersion < 5) {
-                    claimId = String.valueOf(in.readLong());
-                } else {
-                    claimId = in.readUTF();
-                }
-
-                final String container = in.readUTF();
-                final String section = in.readUTF();
-                final long claimOffset = in.readLong();
-
-                final boolean lossTolerant;
-                if (serializationVersion >= 4) {
-                    lossTolerant = in.readBoolean();
-                } else {
-                    lossTolerant = false;
-                }
-
-                final ContentClaim claim = claimManager.newContentClaim(container, section, claimId, lossTolerant);
-
-                if (incrementContentClaims) {
-                    claimManager.incrementClaimantCount(claim);
-                }
-
-                ffBuilder.contentClaim(claim);
-                ffBuilder.contentClaimOffset(claimOffset);
-            }
-
-            boolean attributesChanged = true;
-            if (serializationVersion < 3) {
-                attributesChanged = in.readBoolean();
-            }
-
-            if (attributesChanged) {
-                final int numAttributes = in.readInt();
-                for (int j = 0; j < numAttributes; j++) {
-                    final String key = readString(in);
-                    final String value = readString(in);
-
-                    ffBuilder.addAttribute(key, value);
-                }
-            }
-
-            final FlowFileRecord record = ffBuilder.build();
-            flowFiles.add(record);
-        }
-
-        return flowFiles;
-    }
-
-    private static String readString(final InputStream in) throws IOException {
-        final Integer numBytes = readFieldLength(in);
-        if (numBytes == null) {
-            throw new EOFException();
-        }
-        final byte[] bytes = new byte[numBytes];
-        fillBuffer(in, bytes, numBytes);
-        return new String(bytes, "UTF-8");
-    }
-
-    private static Integer readFieldLength(final InputStream in) throws IOException {
-        final int firstValue = in.read();
-        final int secondValue = in.read();
-        if (firstValue < 0) {
-            return null;
-        }
-        if (secondValue < 0) {
-            throw new EOFException();
-        }
-        if (firstValue == 0xff && secondValue == 0xff) {
-            int ch1 = in.read();
-            int ch2 = in.read();
-            int ch3 = in.read();
-            int ch4 = in.read();
-            if ((ch1 | ch2 | ch3 | ch4) < 0) {
-                throw new EOFException();
-            }
-            return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4));
-        } else {
-            return ((firstValue << 8) + (secondValue));
-        }
-    }
-
-    private static void fillBuffer(final InputStream in, final byte[] buffer, final int length) throws IOException {
-        int bytesRead;
-        int totalBytesRead = 0;
-        while ((bytesRead = in.read(buffer, totalBytesRead, length - totalBytesRead)) > 0) {
-            totalBytesRead += bytesRead;
-        }
-        if (totalBytesRead != length) {
-            throw new EOFException();
-        }
-    }
-
-    private class QueueIdentifier implements Runnable {
-
-        private final QueueProvider connectionProvider;
-
-        public QueueIdentifier(final QueueProvider connectionProvider) {
-            this.connectionProvider = connectionProvider;
-        }
-
-        @Override
-        public void run() {
-            final Collection<FlowFileQueue> allQueues = connectionProvider.getAllQueues();
-            final BlockingQueue<FlowFileQueue> connectionQueue = new LinkedBlockingQueue<>(allQueues);
-
-            final ThreadFactory threadFactory = new ThreadFactory() {
-                @Override
-                public Thread newThread(final Runnable r) {
-                    final Thread t = new Thread(r);
-                    t.setName("Swap Out FlowFiles");
-                    return t;
-                }
-            };
-
-            final ExecutorService workerExecutor = Executors.newFixedThreadPool(swapOutThreadCount, threadFactory);
-            for (int i = 0; i < swapOutThreadCount; i++) {
-                workerExecutor.submit(new SwapOutTask(connectionQueue));
-            }
-
-            workerExecutor.shutdown();
-
-            try {
-                workerExecutor.awaitTermination(10, TimeUnit.MINUTES);
-            } catch (final InterruptedException e) {
-                // oh well...
-            }
-        }
-    }
-
-    private class SwapInTask implements Runnable {
-
-        @Override
-        public void run() {
-            for (final Map.Entry<FlowFileQueue, QueueLockWrapper> entry : swapMap.entrySet()) {
-                final FlowFileQueue flowFileQueue = entry.getKey();
-
-                // if queue is more than 60% of its swap threshold, don't swap flowfiles in
-                if (flowFileQueue.unswappedSize() >= ((float) flowFileQueue.getSwapThreshold() * 0.6F)) {
-                    continue;
-                }
-
-                final QueueLockWrapper queueLockWrapper = entry.getValue();
-                if (queueLockWrapper.getLock().tryLock()) {
-                    try {
-                        final Queue<File> queue = queueLockWrapper.getQueue();
-
-                        // Swap FlowFiles in until we hit 90% of the threshold, or until we're out of files.
-                        while (flowFileQueue.unswappedSize() < ((float) flowFileQueue.getSwapThreshold() * 0.9F)) {
-                            File swapFile = null;
-                            try {
-                                swapFile = queue.poll();
-                                if (swapFile == null) {
-                                    break;
-                                }
-
-                                try (final InputStream fis = new FileInputStream(swapFile);
-                                        final DataInputStream in = new DataInputStream(fis)) {
-                                    final List<FlowFileRecord> swappedFlowFiles = deserializeFlowFiles(in, flowFileQueue, claimManager);
-                                    flowFileRepository.swapFlowFilesIn(swapFile.getAbsolutePath(), swappedFlowFiles, flowFileQueue);
-                                    flowFileQueue.putSwappedRecords(swappedFlowFiles);
-                                }
-
-                                if (!swapFile.delete()) {
-                                    warn("Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually");
-                                }
-                            } catch (final EOFException eof) {
-                                error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Corrupt Swap File; will remove this Swap File: " + swapFile);
-                                
-                                if ( !swapFile.delete() ) {
-                                    warn("Failed to remove corrupt Swap File " + swapFile + "; This file should be cleaned up manually");
-                                }
-                            } catch (final FileNotFoundException fnfe) {
-                                error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Could not find Swap File " + swapFile);
-                            } catch (final Exception e) {
-                                error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to " + e, e);
-                                
-                                if (swapFile != null) {
-                                    queue.add(swapFile);
-                                }
-                            }
-                        }
-                    } finally {
-                        queueLockWrapper.getLock().unlock();
-                    }
-                }
-            }
-        }
-    }
-
-    private void error(final String error, final Throwable t) {
-        error(error);
-        if ( logger.isDebugEnabled() ) {
-            logger.error("", t);
-        }
-    }
-    
-    private void error(final String error) {
-        logger.error(error);
-        if ( eventReporter != null ) {
-            eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, error);
-        }
-    }
-    
-    private void warn(final String warning) {
-        logger.warn(warning);
-        if ( eventReporter != null ) {
-            eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, warning);
-        }
-    }
-    
-    
-    private class SwapOutTask implements Runnable {
-        private final BlockingQueue<FlowFileQueue> connectionQueue;
-
-        public SwapOutTask(final BlockingQueue<FlowFileQueue> connectionQueue) {
-            this.connectionQueue = connectionQueue;
-        }
-
-        @Override
-        public void run() {
-            while (true) {
-                final FlowFileQueue flowFileQueue = connectionQueue.poll();
-                if (flowFileQueue == null) {
-                    logger.debug("No more FlowFile Queues to Swap Out");
-                    return;
-                }
-
-                if (logger.isDebugEnabled()) {
-                    logger.debug("{} has {} FlowFiles to swap out", flowFileQueue, flowFileQueue.getSwapQueueSize());
-                }
-
-                while (flowFileQueue.getSwapQueueSize() >= MINIMUM_SWAP_COUNT) {
-                    final File swapFile = new File(storageDirectory, System.currentTimeMillis() + "-" + UUID.randomUUID().toString() + ".swap");
-                    final File swapTempFile = new File(swapFile.getParentFile(), swapFile.getName() + ".part");
-                    final String swapLocation = swapFile.getAbsolutePath();
-                    final List<FlowFileRecord> toSwap = flowFileQueue.pollSwappableRecords();
-
-                    int recordsSwapped;
-                    try {
-                        try (final FileOutputStream fos = new FileOutputStream(swapTempFile)) {
-                            recordsSwapped = serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
-                            fos.getFD().sync();
-                        }
-                        
-                        if ( swapTempFile.renameTo(swapFile) ) {
-                            flowFileRepository.swapFlowFilesOut(toSwap, flowFileQueue, swapLocation);
-                        } else {
-                            error("Failed to swap out FlowFiles from " + flowFileQueue + " due to: Unable to rename swap file from " + swapTempFile + " to " + swapFile);
-                            recordsSwapped = 0;
-                        }
-                    } catch (final IOException ioe) {
-                        recordsSwapped = 0;
-                        flowFileQueue.putSwappedRecords(toSwap);
-                        error("Failed to swap out " + toSwap.size() + " FlowFiles from " + flowFileQueue + " to Swap File " + swapLocation + " due to " + ioe, ioe);
-                    }
-
-                    if (recordsSwapped > 0) {
-                        QueueLockWrapper swapQueue = swapMap.get(flowFileQueue);
-                        if (swapQueue == null) {
-                            swapQueue = new QueueLockWrapper(new LinkedBlockingQueue<File>());
-                            QueueLockWrapper oldQueue = swapMap.putIfAbsent(flowFileQueue, swapQueue);
-                            if (oldQueue != null) {
-                                swapQueue = oldQueue;
-                            }
-                        }
-
-                        swapQueue.getQueue().add(swapFile);
-                    } else {
-                        swapTempFile.delete();
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * Recovers FlowFiles from all Swap Files, returning the largest FlowFile ID
-     * that was recovered.
-     *
-     * @param queueProvider
-     * @return
-     */
-    @Override
-    public long recoverSwappedFlowFiles(final QueueProvider queueProvider, final ContentClaimManager claimManager) {
-        final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() {
-            @Override
-            public boolean accept(final File dir, final String name) {
-                return SWAP_FILE_PATTERN.matcher(name).matches() || TEMP_SWAP_FILE_PATTERN.matcher(name).matches();
-            }
-        });
-
-        if (swapFiles == null) {
-            return 0L;
-        }
-
-        final Collection<FlowFileQueue> allQueues = queueProvider.getAllQueues();
-        final Map<String, FlowFileQueue> queueMap = new HashMap<>();
-        for (final FlowFileQueue queue : allQueues) {
-            queueMap.put(queue.getIdentifier(), queue);
-        }
-
-        final ConnectionSwapInfo swapInfo = new ConnectionSwapInfo();
-        int swappedCount = 0;
-        long swappedBytes = 0L;
-        long maxRecoveredId = 0L;
-
-        for (final File swapFile : swapFiles) {
-            if ( TEMP_SWAP_FILE_PATTERN.matcher(swapFile.getName()).matches() ) {
-                if ( swapFile.delete() ) {
-                    logger.info("Removed incomplete/temporary Swap File " + swapFile);
-                } else {
-                    warn("Failed to remove incomplete/temporary Swap File " + swapFile + "; this file should be cleaned up manually");
-                }
-                
-                continue;
-            }
-            
-            // read record to disk via the swap file
-            try (final InputStream fis = new FileInputStream(swapFile);
-                    final InputStream bufferedIn = new BufferedInputStream(fis);
-                    final DataInputStream in = new DataInputStream(bufferedIn)) {
-
-                final int swapEncodingVersion = in.readInt();
-                if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
-                    final String errMsg = "Cannot swap FlowFiles in from " + swapFile + " because the encoding version is "
-                            + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)";
-
-                    eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
-                    throw new IOException(errMsg);
-                }
-
-                final String connectionId = in.readUTF();
-                final FlowFileQueue queue = queueMap.get(connectionId);
-                if (queue == null) {
-                    error("Cannot recover Swapped FlowFiles from Swap File " + swapFile + " because the FlowFiles belong to a Connection with ID " + connectionId + " and that Connection does not exist");
-                    continue;
-                }
-
-                final int numRecords = in.readInt();
-                final long contentSize = in.readLong();
-
-                swapInfo.addSwapSizeInfo(connectionId, swapFile.getAbsolutePath(), new QueueSize(numRecords, contentSize));
-                swappedCount += numRecords;
-                swappedBytes += contentSize;
-
-                final List<FlowFileRecord> records = deserializeFlowFiles(in, numRecords, queue, swapEncodingVersion, true, claimManager);
-                long maxId = 0L;
-                for (final FlowFileRecord record : records) {
-                    if (record.getId() > maxId) {
-                        maxId = record.getId();
-                    }
-                }
-
-                if (maxId > maxRecoveredId) {
-                    maxRecoveredId = maxId;
-                }
-            } catch (final IOException ioe) {
-                error("Cannot recover Swapped FlowFiles from Swap File " + swapFile + " due to " + ioe, ioe);
-            }
-        }
-
-        restoreSwapLocations(queueMap.values(), swapInfo);
-        logger.info("Recovered {} FlowFiles ({} bytes) from Swap Files", swappedCount, swappedBytes);
-        return maxRecoveredId;
-    }
-
-    public void restoreSwapLocations(final Collection<FlowFileQueue> flowFileQueues, final ConnectionSwapInfo swapInfo) {
-        for (final FlowFileQueue queue : flowFileQueues) {
-            final String queueId = queue.getIdentifier();
-            final Collection<String> swapFileLocations = swapInfo.getSwapFileLocations(queueId);
-            if (swapFileLocations == null || swapFileLocations.isEmpty()) {
-                continue;
-            }
-
-            final SortedMap<String, QueueSize> sortedFileQueueMap = new TreeMap<>(new SwapFileComparator());
-            for (final String swapFileLocation : swapFileLocations) {
-                final QueueSize queueSize = swapInfo.getSwappedSize(queueId, swapFileLocation);
-                sortedFileQueueMap.put(swapFileLocation, queueSize);
-            }
-
-            QueueLockWrapper fileQueue = swapMap.get(queue);
-            if (fileQueue == null) {
-                fileQueue = new QueueLockWrapper(new LinkedBlockingQueue<File>());
-                swapMap.put(queue, fileQueue);
-            }
-
-            for (final Map.Entry<String, QueueSize> innerEntry : sortedFileQueueMap.entrySet()) {
-                final File swapFile = new File(innerEntry.getKey());
-                final QueueSize size = innerEntry.getValue();
-                fileQueue.getQueue().add(swapFile);
-                queue.incrementSwapCount(size.getObjectCount(), size.getByteCount());
-            }
-        }
-    }
-
-    public void shutdown() {
-        swapQueueIdentifierExecutor.shutdownNow();
-        swapInExecutor.shutdownNow();
-    }
-
-    private static class SwapFileComparator implements Comparator<String> {
-
-        @Override
-        public int compare(final String o1, final String o2) {
-            if (o1 == o2) {
-                return 0;
-            }
-
-            final Long time1 = getTimestampFromFilename(o1);
-            final Long time2 = getTimestampFromFilename(o2);
-
-            if (time1 == null && time2 == null) {
-                return 0;
-            }
-            if (time1 == null) {
-                return 1;
-            }
-            if (time2 == null) {
-                return -1;
-            }
-
-            final int timeComparisonValue = time1.compareTo(time2);
-            if (timeComparisonValue != 0) {
-                return timeComparisonValue;
-            }
-
-            return o1.compareTo(o2);
-        }
-
-        private Long getTimestampFromFilename(final String fullyQualifiedFilename) {
-            if (fullyQualifiedFilename == null) {
-                return null;
-            }
-
-            final File file = new File(fullyQualifiedFilename);
-            final String filename = file.getName();
-
-            final int idx = filename.indexOf("-");
-            if (idx < 1) {
-                return null;
-            }
-
-            final String millisVal = filename.substring(0, idx);
-            try {
-                return Long.parseLong(millisVal);
-            } catch (final NumberFormatException e) {
-                return null;
-            }
-        }
-    }
-
-    private static class QueueLockWrapper {
-
-        private final Lock lock = new ReentrantLock();
-        private final Queue<File> queue;
-
-        public QueueLockWrapper(final Queue<File> queue) {
-            this.queue = queue;
-        }
-
-        public Queue<File> getQueue() {
-            return queue;
-        }
-
-        public Lock getLock() {
-            return lock;
-        }
-
-        @Override
-        public int hashCode() {
-            return queue.hashCode();
-        }
-
-        @Override
-        public boolean equals(final Object obj) {
-            if (obj instanceof QueueLockWrapper) {
-                return queue.equals(((QueueLockWrapper) obj).queue);
-            }
-            return false;
-        }
-    }
-}