You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:29:28 UTC
[05/51] [partial] incubator-nifi git commit: Initial code contribution
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
new file mode 100644
index 0000000..3af2098
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -0,0 +1,709 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.controller.repository.ConnectionSwapInfo;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.repository.QueueProvider;
+import org.apache.nifi.controller.repository.StandardFlowFileRecord;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.io.BufferedOutputStream;
+import org.apache.nifi.processor.QueueSize;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * An implementation of the {@link FlowFileSwapManager} that swaps FlowFiles
+ * to/from local disk
+ * </p>
+ */
+public class FileSystemSwapManager implements FlowFileSwapManager {
+
+ public static final int MINIMUM_SWAP_COUNT = 10000;
+ private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap");
+ public static final int SWAP_ENCODING_VERSION = 6;
+
+ private final ScheduledExecutorService swapQueueIdentifierExecutor;
+ private final ScheduledExecutorService swapInExecutor;
+ private volatile FlowFileRepository flowFileRepository;
+
+ // Maintains a mapping of FlowFile Queue to the a QueueLockWrapper, which provides queue locking and necessary state for swapping back in
+ private final ConcurrentMap<FlowFileQueue, QueueLockWrapper> swapMap = new ConcurrentHashMap<>();
+ private final File storageDirectory;
+ private final long swapInMillis;
+ private final long swapOutMillis;
+ private final int swapOutThreadCount;
+
+ private ContentClaimManager claimManager; // effectively final
+
+ private static final Logger logger = LoggerFactory.getLogger(FileSystemSwapManager.class);
+
+ public FileSystemSwapManager() {
+ this.storageDirectory = NiFiProperties.getInstance().getSwapStorageLocation();
+ if (!storageDirectory.exists() && !storageDirectory.mkdirs()) {
+ throw new RuntimeException("Cannot create Swap Storage directory " + storageDirectory.getAbsolutePath());
+ }
+
+ swapQueueIdentifierExecutor = new FlowEngine(1, "Identifies Queues for FlowFile Swapping");
+
+ final NiFiProperties properties = NiFiProperties.getInstance();
+ swapInMillis = FormatUtils.getTimeDuration(properties.getSwapInPeriod(), TimeUnit.MILLISECONDS);
+ swapOutMillis = FormatUtils.getTimeDuration(properties.getSwapOutPeriod(), TimeUnit.MILLISECONDS);
+ swapOutThreadCount = properties.getSwapOutThreads();
+ swapInExecutor = new FlowEngine(properties.getSwapInThreads(), "Swap In FlowFiles");
+ }
+
+ @Override
+ public void purge() {
+ final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(final File dir, final String name) {
+ return SWAP_FILE_PATTERN.matcher(name).matches();
+ }
+ });
+
+ if (swapFiles != null) {
+ for (final File file : swapFiles) {
+ if (!file.delete() && file.exists()) {
+ logger.warn("Failed to delete SWAP file {}", file);
+ }
+ }
+ }
+ }
+
+ public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ContentClaimManager claimManager) {
+ this.claimManager = claimManager;
+ this.flowFileRepository = flowFileRepository;
+ swapQueueIdentifierExecutor.scheduleWithFixedDelay(new QueueIdentifier(connectionProvider), swapOutMillis, swapOutMillis, TimeUnit.MILLISECONDS);
+ swapInExecutor.scheduleWithFixedDelay(new SwapInTask(), swapInMillis, swapInMillis, TimeUnit.MILLISECONDS);
+ }
+
+ public int serializeFlowFiles(final List<FlowFileRecord> toSwap, final FlowFileQueue queue, final String swapLocation, final OutputStream destination) throws IOException {
+ if (toSwap == null || toSwap.isEmpty()) {
+ return 0;
+ }
+
+ long contentSize = 0L;
+ for (final FlowFileRecord record : toSwap) {
+ contentSize += record.getSize();
+ }
+
+ // persist record to disk via the swap file
+ final OutputStream bufferedOut = new BufferedOutputStream(destination);
+ final DataOutputStream out = new DataOutputStream(bufferedOut);
+ try {
+ out.writeInt(SWAP_ENCODING_VERSION);
+ out.writeUTF(queue.getIdentifier());
+ out.writeInt(toSwap.size());
+ out.writeLong(contentSize);
+
+ for (final FlowFileRecord flowFile : toSwap) {
+ out.writeLong(flowFile.getId());
+ out.writeLong(flowFile.getEntryDate());
+
+ final Set<String> lineageIdentifiers = flowFile.getLineageIdentifiers();
+ out.writeInt(lineageIdentifiers.size());
+ for (final String lineageId : lineageIdentifiers) {
+ out.writeUTF(lineageId);
+ }
+
+ out.writeLong(flowFile.getLineageStartDate());
+ out.writeLong(flowFile.getLastQueueDate());
+ out.writeLong(flowFile.getSize());
+
+ final ContentClaim claim = flowFile.getContentClaim();
+ if (claim == null) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ out.writeUTF(claim.getId());
+ out.writeUTF(claim.getContainer());
+ out.writeUTF(claim.getSection());
+ out.writeLong(flowFile.getContentClaimOffset());
+ out.writeBoolean(claim.isLossTolerant());
+ }
+
+ final Map<String, String> attributes = flowFile.getAttributes();
+ out.writeInt(attributes.size());
+ for (final Map.Entry<String, String> entry : attributes.entrySet()) {
+ writeString(entry.getKey(), out);
+ writeString(entry.getValue(), out);
+ }
+ }
+ } finally {
+ out.flush();
+ }
+
+ logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", new Object[]{toSwap.size(), queue, swapLocation});
+
+ return toSwap.size();
+ }
+
+ private void writeString(final String toWrite, final OutputStream out) throws IOException {
+ final byte[] bytes = toWrite.getBytes("UTF-8");
+ final int utflen = bytes.length;
+
+ if (utflen < 65535) {
+ out.write(utflen >>> 8);
+ out.write(utflen);
+ out.write(bytes);
+ } else {
+ out.write(255);
+ out.write(255);
+ out.write(utflen >>> 24);
+ out.write(utflen >>> 16);
+ out.write(utflen >>> 8);
+ out.write(utflen);
+ out.write(bytes);
+ }
+ }
+
+ static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final FlowFileQueue queue, final ContentClaimManager claimManager) throws IOException {
+ final int swapEncodingVersion = in.readInt();
+ if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
+ throw new IOException("Cannot swap FlowFiles in from SwapFile because the encoding version is "
+ + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)");
+ }
+
+ final String connectionId = in.readUTF();
+ if (!connectionId.equals(queue.getIdentifier())) {
+ throw new IllegalArgumentException("Cannot restore Swap File because the file indicates that records belong to Connection with ID " + connectionId + " but received Connection " + queue);
+ }
+
+ final int numRecords = in.readInt();
+ in.readLong(); // Content Size
+
+ return deserializeFlowFiles(in, numRecords, queue, swapEncodingVersion, false, claimManager);
+ }
+
+ static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final int numFlowFiles, final FlowFileQueue queue, final int serializationVersion, final boolean incrementContentClaims, final ContentClaimManager claimManager) throws IOException {
+ final List<FlowFileRecord> flowFiles = new ArrayList<>();
+ for (int i = 0; i < numFlowFiles; i++) {
+ // legacy encoding had an "action" because it used to be couple with FlowFile Repository code
+ if (serializationVersion < 3) {
+ final int action = in.read();
+ if (action != 1) {
+ throw new IOException("Swap File is version " + serializationVersion + " but did not contain a 'UPDATE' record type");
+ }
+ }
+
+ final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
+ ffBuilder.id(in.readLong());
+ ffBuilder.entryDate(in.readLong());
+
+ if (serializationVersion > 1) {
+ // Lineage information was added in version 2
+ final int numLineageIdentifiers = in.readInt();
+ final Set<String> lineageIdentifiers = new HashSet<>(numLineageIdentifiers);
+ for (int lineageIdIdx = 0; lineageIdIdx < numLineageIdentifiers; lineageIdIdx++) {
+ lineageIdentifiers.add(in.readUTF());
+ }
+ ffBuilder.lineageIdentifiers(lineageIdentifiers);
+ ffBuilder.lineageStartDate(in.readLong());
+
+ if (serializationVersion > 5) {
+ ffBuilder.lastQueueDate(in.readLong());
+ }
+ }
+
+ ffBuilder.size(in.readLong());
+
+ if (serializationVersion < 3) {
+ readString(in); // connection Id
+ }
+
+ final boolean hasClaim = in.readBoolean();
+ if (hasClaim) {
+ final String claimId;
+ if (serializationVersion < 5) {
+ claimId = String.valueOf(in.readLong());
+ } else {
+ claimId = in.readUTF();
+ }
+
+ final String container = in.readUTF();
+ final String section = in.readUTF();
+ final long claimOffset = in.readLong();
+
+ final boolean lossTolerant;
+ if (serializationVersion >= 4) {
+ lossTolerant = in.readBoolean();
+ } else {
+ lossTolerant = false;
+ }
+
+ final ContentClaim claim = claimManager.newContentClaim(container, section, claimId, lossTolerant);
+
+ if (incrementContentClaims) {
+ claimManager.incrementClaimantCount(claim);
+ }
+
+ ffBuilder.contentClaim(claim);
+ ffBuilder.contentClaimOffset(claimOffset);
+ }
+
+ boolean attributesChanged = true;
+ if (serializationVersion < 3) {
+ attributesChanged = in.readBoolean();
+ }
+
+ if (attributesChanged) {
+ final int numAttributes = in.readInt();
+ for (int j = 0; j < numAttributes; j++) {
+ final String key = readString(in);
+ final String value = readString(in);
+
+ ffBuilder.addAttribute(key, value);
+ }
+ }
+
+ final FlowFileRecord record = ffBuilder.build();
+ flowFiles.add(record);
+ }
+
+ return flowFiles;
+ }
+
+ private static String readString(final InputStream in) throws IOException {
+ final Integer numBytes = readFieldLength(in);
+ if (numBytes == null) {
+ throw new EOFException();
+ }
+ final byte[] bytes = new byte[numBytes];
+ fillBuffer(in, bytes, numBytes);
+ return new String(bytes, "UTF-8");
+ }
+
+ private static Integer readFieldLength(final InputStream in) throws IOException {
+ final int firstValue = in.read();
+ final int secondValue = in.read();
+ if (firstValue < 0) {
+ return null;
+ }
+ if (secondValue < 0) {
+ throw new EOFException();
+ }
+ if (firstValue == 0xff && secondValue == 0xff) {
+ int ch1 = in.read();
+ int ch2 = in.read();
+ int ch3 = in.read();
+ int ch4 = in.read();
+ if ((ch1 | ch2 | ch3 | ch4) < 0) {
+ throw new EOFException();
+ }
+ return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4));
+ } else {
+ return ((firstValue << 8) + (secondValue));
+ }
+ }
+
+ private static void fillBuffer(final InputStream in, final byte[] buffer, final int length) throws IOException {
+ int bytesRead;
+ int totalBytesRead = 0;
+ while ((bytesRead = in.read(buffer, totalBytesRead, length - totalBytesRead)) > 0) {
+ totalBytesRead += bytesRead;
+ }
+ if (totalBytesRead != length) {
+ throw new EOFException();
+ }
+ }
+
+ private class QueueIdentifier implements Runnable {
+
+ private final QueueProvider connectionProvider;
+
+ public QueueIdentifier(final QueueProvider connectionProvider) {
+ this.connectionProvider = connectionProvider;
+ }
+
+ @Override
+ public void run() {
+ final Collection<FlowFileQueue> allQueues = connectionProvider.getAllQueues();
+ final BlockingQueue<FlowFileQueue> connectionQueue = new LinkedBlockingQueue<>(allQueues);
+
+ final ThreadFactory threadFactory = new ThreadFactory() {
+ @Override
+ public Thread newThread(final Runnable r) {
+ final Thread t = new Thread(r);
+ t.setName("Swap Out FlowFiles");
+ return t;
+ }
+ };
+
+ final ExecutorService workerExecutor = Executors.newFixedThreadPool(swapOutThreadCount, threadFactory);
+ for (int i = 0; i < swapOutThreadCount; i++) {
+ workerExecutor.submit(new SwapOutTask(connectionQueue));
+ }
+
+ workerExecutor.shutdown();
+
+ try {
+ workerExecutor.awaitTermination(10, TimeUnit.MINUTES);
+ } catch (final InterruptedException e) {
+ // oh well...
+ }
+ }
+ }
+
+ private class SwapInTask implements Runnable {
+
+ @Override
+ public void run() {
+ for (final Map.Entry<FlowFileQueue, QueueLockWrapper> entry : swapMap.entrySet()) {
+ final FlowFileQueue flowFileQueue = entry.getKey();
+
+ // if queue is more than 60% of its swap threshold, don't swap flowfiles in
+ if (flowFileQueue.unswappedSize() >= ((float) flowFileQueue.getSwapThreshold() * 0.6F)) {
+ continue;
+ }
+
+ final QueueLockWrapper queueLockWrapper = entry.getValue();
+ if (queueLockWrapper.getLock().tryLock()) {
+ try {
+ final Queue<File> queue = queueLockWrapper.getQueue();
+
+ // Swap FlowFiles in until we hit 90% of the threshold, or until we're out of files.
+ while (flowFileQueue.unswappedSize() < ((float) flowFileQueue.getSwapThreshold() * 0.9F)) {
+ File swapFile = null;
+ try {
+ swapFile = queue.poll();
+ if (swapFile == null) {
+ break;
+ }
+
+ try (final InputStream fis = new FileInputStream(swapFile);
+ final DataInputStream in = new DataInputStream(fis)) {
+ final List<FlowFileRecord> swappedFlowFiles = deserializeFlowFiles(in, flowFileQueue, claimManager);
+ flowFileRepository.swapFlowFilesIn(swapFile.getAbsolutePath(), swappedFlowFiles, flowFileQueue);
+ flowFileQueue.putSwappedRecords(swappedFlowFiles);
+ }
+
+ if (!swapFile.delete()) {
+ logger.warn("Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file can be cleaned up manually");
+ }
+ } catch (final Exception e) {
+ logger.error("Failed to Swap In FlowFiles for {} due to {}", new Object[]{flowFileQueue, e.toString()}, e);
+ if (swapFile != null) {
+ queue.add(swapFile);
+ }
+ }
+ }
+ } finally {
+ queueLockWrapper.getLock().unlock();
+ }
+ }
+ }
+ }
+ }
+
+ private class SwapOutTask implements Runnable {
+
+ private final BlockingQueue<FlowFileQueue> connectionQueue;
+
+ public SwapOutTask(final BlockingQueue<FlowFileQueue> connectionQueue) {
+ this.connectionQueue = connectionQueue;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ final FlowFileQueue flowFileQueue = connectionQueue.poll();
+ if (flowFileQueue == null) {
+ logger.debug("No more FlowFile Queues to Swap Out");
+ return;
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("{} has {} FlowFiles to swap out", flowFileQueue, flowFileQueue.getSwapQueueSize());
+ }
+
+ while (flowFileQueue.getSwapQueueSize() >= MINIMUM_SWAP_COUNT) {
+ final File swapFile = new File(storageDirectory, System.currentTimeMillis() + "-" + UUID.randomUUID().toString() + ".swap");
+ final String swapLocation = swapFile.getAbsolutePath();
+ final List<FlowFileRecord> toSwap = flowFileQueue.pollSwappableRecords();
+
+ int recordsSwapped;
+ try (final FileOutputStream fos = new FileOutputStream(swapFile)) {
+ recordsSwapped = serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
+ flowFileRepository.swapFlowFilesOut(toSwap, flowFileQueue, swapLocation);
+ fos.getFD().sync();
+ } catch (final IOException ioe) {
+ recordsSwapped = 0;
+ flowFileQueue.putSwappedRecords(toSwap);
+ logger.error("Failed to swap out {} FlowFiles from {} to Swap File {} due to {}", new Object[]{toSwap.size(), flowFileQueue, swapLocation, ioe.toString()}, ioe);
+ }
+
+ if (recordsSwapped > 0) {
+ QueueLockWrapper swapQueue = swapMap.get(flowFileQueue);
+ if (swapQueue == null) {
+ swapQueue = new QueueLockWrapper(new LinkedBlockingQueue<File>());
+ QueueLockWrapper oldQueue = swapMap.putIfAbsent(flowFileQueue, swapQueue);
+ if (oldQueue != null) {
+ swapQueue = oldQueue;
+ }
+ }
+
+ swapQueue.getQueue().add(swapFile);
+ } else {
+ swapFile.delete();
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Recovers FlowFiles from all Swap Files, returning the largest FlowFile ID
+ * that was recovered.
+ *
+ * @param queueProvider
+ * @return
+ */
+ @Override
+ public long recoverSwappedFlowFiles(final QueueProvider queueProvider, final ContentClaimManager claimManager) {
+ final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(final File dir, final String name) {
+ return SWAP_FILE_PATTERN.matcher(name).matches();
+ }
+ });
+
+ if (swapFiles == null) {
+ return 0L;
+ }
+
+ final Collection<FlowFileQueue> allQueues = queueProvider.getAllQueues();
+ final Map<String, FlowFileQueue> queueMap = new HashMap<>();
+ for (final FlowFileQueue queue : allQueues) {
+ queueMap.put(queue.getIdentifier(), queue);
+ }
+
+ final ConnectionSwapInfo swapInfo = new ConnectionSwapInfo();
+ int swappedCount = 0;
+ long swappedBytes = 0L;
+ long maxRecoveredId = 0L;
+
+ for (final File swapFile : swapFiles) {
+ // read record to disk via the swap file
+ try (final InputStream fis = new FileInputStream(swapFile);
+ final InputStream bufferedIn = new BufferedInputStream(fis);
+ final DataInputStream in = new DataInputStream(bufferedIn)) {
+
+ final int swapEncodingVersion = in.readInt();
+ if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
+ throw new IOException("Cannot swap FlowFiles in from " + swapFile + " because the encoding version is "
+ + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)");
+ }
+
+ final String connectionId = in.readUTF();
+ final FlowFileQueue queue = queueMap.get(connectionId);
+ if (queue == null) {
+ logger.error("Cannot recover Swapped FlowFiles from Swap File {} because the FlowFiles belong to a Connection with ID {} and that Connection does not exist", swapFile, connectionId);
+ continue;
+ }
+
+ final int numRecords = in.readInt();
+ final long contentSize = in.readLong();
+
+ swapInfo.addSwapSizeInfo(connectionId, swapFile.getAbsolutePath(), new QueueSize(numRecords, contentSize));
+ swappedCount += numRecords;
+ swappedBytes += contentSize;
+
+ final List<FlowFileRecord> records = deserializeFlowFiles(in, numRecords, queue, swapEncodingVersion, true, claimManager);
+ long maxId = 0L;
+ for (final FlowFileRecord record : records) {
+ if (record.getId() > maxId) {
+ maxId = record.getId();
+ }
+ }
+
+ if (maxId > maxRecoveredId) {
+ maxRecoveredId = maxId;
+ }
+ } catch (final IOException ioe) {
+ logger.error("Cannot recover Swapped FlowFiles from Swap File {} due to {}", swapFile, ioe.toString());
+ if (logger.isDebugEnabled()) {
+ logger.error("", ioe);
+ }
+ }
+ }
+
+ restoreSwapLocations(queueMap.values(), swapInfo);
+ logger.info("Recovered {} FlowFiles ({} bytes) from Swap Files", swappedCount, swappedBytes);
+ return maxRecoveredId;
+ }
+
+ public void restoreSwapLocations(final Collection<FlowFileQueue> flowFileQueues, final ConnectionSwapInfo swapInfo) {
+ for (final FlowFileQueue queue : flowFileQueues) {
+ final String queueId = queue.getIdentifier();
+ final Collection<String> swapFileLocations = swapInfo.getSwapFileLocations(queueId);
+ if (swapFileLocations == null || swapFileLocations.isEmpty()) {
+ continue;
+ }
+
+ final SortedMap<String, QueueSize> sortedFileQueueMap = new TreeMap<>(new SwapFileComparator());
+ for (final String swapFileLocation : swapFileLocations) {
+ final QueueSize queueSize = swapInfo.getSwappedSize(queueId, swapFileLocation);
+ sortedFileQueueMap.put(swapFileLocation, queueSize);
+ }
+
+ QueueLockWrapper fileQueue = swapMap.get(queue);
+ if (fileQueue == null) {
+ fileQueue = new QueueLockWrapper(new LinkedBlockingQueue<File>());
+ swapMap.put(queue, fileQueue);
+ }
+
+ for (final Map.Entry<String, QueueSize> innerEntry : sortedFileQueueMap.entrySet()) {
+ final File swapFile = new File(innerEntry.getKey());
+ final QueueSize size = innerEntry.getValue();
+ fileQueue.getQueue().add(swapFile);
+ queue.incrementSwapCount(size.getObjectCount(), size.getByteCount());
+ }
+ }
+ }
+
+ public void shutdown() {
+ swapQueueIdentifierExecutor.shutdownNow();
+ swapInExecutor.shutdownNow();
+ }
+
+ private static class SwapFileComparator implements Comparator<String> {
+
+ @Override
+ public int compare(final String o1, final String o2) {
+ if (o1 == o2) {
+ return 0;
+ }
+
+ final Long time1 = getTimestampFromFilename(o1);
+ final Long time2 = getTimestampFromFilename(o2);
+
+ if (time1 == null && time2 == null) {
+ return 0;
+ }
+ if (time1 == null) {
+ return 1;
+ }
+ if (time2 == null) {
+ return -1;
+ }
+
+ final int timeComparisonValue = time1.compareTo(time2);
+ if (timeComparisonValue != 0) {
+ return timeComparisonValue;
+ }
+
+ return o1.compareTo(o2);
+ }
+
+ private Long getTimestampFromFilename(final String fullyQualifiedFilename) {
+ if (fullyQualifiedFilename == null) {
+ return null;
+ }
+
+ final File file = new File(fullyQualifiedFilename);
+ final String filename = file.getName();
+
+ final int idx = filename.indexOf("-");
+ if (idx < 1) {
+ return null;
+ }
+
+ final String millisVal = filename.substring(0, idx);
+ try {
+ return Long.parseLong(millisVal);
+ } catch (final NumberFormatException e) {
+ return null;
+ }
+ }
+ }
+
+ private static class QueueLockWrapper {
+
+ private final Lock lock = new ReentrantLock();
+ private final Queue<File> queue;
+
+ public QueueLockWrapper(final Queue<File> queue) {
+ this.queue = queue;
+ }
+
+ public Queue<File> getQueue() {
+ return queue;
+ }
+
+ public Lock getLock() {
+ return lock;
+ }
+
+ @Override
+ public int hashCode() {
+ return queue.hashCode();
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj instanceof QueueLockWrapper) {
+ return queue.equals(((QueueLockWrapper) obj).queue);
+ }
+ return false;
+ }
+ }
+}