You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/10/26 19:54:36 UTC
nifi git commit: NIFI-1070: Added detailed debug-level logging about
how FileSystemRepository is choosing to expire archived data
Repository: nifi
Updated Branches:
refs/heads/master f8c3377c8 -> aec32a277
NIFI-1070: Added detailed debug-level logging about how FileSystemRepository is choosing to expire archived data
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/aec32a27
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/aec32a27
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/aec32a27
Branch: refs/heads/master
Commit: aec32a277c3f2b707edd20a9eff9c4984f4f28fa
Parents: f8c3377
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Oct 26 14:36:03 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Oct 26 14:36:03 2015 -0400
----------------------------------------------------------------------
.../repository/FileSystemRepository.java | 26 ++++++++++++++------
1 file changed, 18 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/aec32a27/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 724e26e..72a50ec 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -87,6 +87,8 @@ public class FileSystemRepository implements ContentRepository {
public static final Pattern MAX_ARCHIVE_SIZE_PATTERN = Pattern.compile("\\d{1,2}%");
private static final Logger LOG = LoggerFactory.getLogger(FileSystemRepository.class);
+ private final Logger archiveExpirationLog = LoggerFactory.getLogger(FileSystemRepository.class.getName() + ".archive.expiration");
+
private final Map<String, Path> containers;
private final List<String> containerNames;
private final AtomicLong index;
@@ -151,7 +153,7 @@ public class FileSystemRepository implements ContentRepository {
if (maxArchiveSize == null) {
throw new RuntimeException("No value specified for property '"
- + NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE + "' but archiving is enabled. You must configure the max disk usage in order to enable archiving.");
+ + NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE + "' but archiving is enabled. You must configure the max disk usage in order to enable archiving.");
}
if (!MAX_ARCHIVE_SIZE_PATTERN.matcher(maxArchiveSize.trim()).matches()) {
@@ -185,7 +187,7 @@ public class FileSystemRepository implements ContentRepository {
final long maxArchiveBytes = (long) (capacity * (1D - (maxArchiveRatio - 0.02)));
minUsableContainerBytesForArchive.put(container.getKey(), Long.valueOf(maxArchiveBytes));
LOG.info("Maximum Threshold for Container {} set to {} bytes; if volume exceeds this size, archived data will be deleted until it no longer exceeds this size",
- containerName, maxArchiveBytes);
+ containerName, maxArchiveBytes);
final long backPressureBytes = (long) (Files.getFileStore(container.getValue()).getTotalSpace() * archiveBackPressureRatio);
final ContainerState containerState = new ContainerState(containerName, true, backPressureBytes, capacity);
@@ -620,7 +622,7 @@ public class FileSystemRepository implements ContentRepository {
final File file = path.toFile();
if (!file.delete() && file.exists()) {
- LOG.warn("Unable to delete {} at path {}", new Object[]{claim, path});
+ LOG.warn("Unable to delete {} at path {}", new Object[] {claim, path});
return false;
}
@@ -1051,7 +1053,7 @@ public class FileSystemRepository implements ContentRepository {
break;
} else {
LOG.warn("Failed to clean up {} because old claims aren't being cleaned up fast enough. "
- + "This Content Claim will remain in the Content Repository until NiFi is restarted, at which point it will be cleaned up", claim);
+ + "This Content Claim will remain in the Content Repository until NiFi is restarted, at which point it will be cleaned up", claim);
}
}
} catch (final InterruptedException ie) {
@@ -1187,6 +1189,7 @@ public class FileSystemRepository implements ContentRepository {
}
private long destroyExpiredArchives(final String containerName, final Path container) throws IOException {
+ archiveExpirationLog.debug("Destroying Expired Archives for Container {}", containerName);
final List<ArchiveInfo> notYetExceedingThreshold = new ArrayList<>();
final long removalTimeThreshold = System.currentTimeMillis() - maxArchiveMillis;
long oldestArchiveDateFound = System.currentTimeMillis();
@@ -1194,6 +1197,7 @@ public class FileSystemRepository implements ContentRepository {
// determine how much space we must have in order to stop deleting old data
final Long minRequiredSpace = minUsableContainerBytesForArchive.get(containerName);
if (minRequiredSpace == null) {
+ archiveExpirationLog.debug("Could not determine minimum required space so will not destroy any archived data");
return -1L;
}
@@ -1204,6 +1208,7 @@ public class FileSystemRepository implements ContentRepository {
final long startNanos = System.nanoTime();
final long toFree = minRequiredSpace - usableSpace;
final BlockingQueue<ArchiveInfo> fileQueue = archivedFiles.get(containerName);
+ archiveExpirationLog.info("Currently {} bytes free for Container {}; requirement is {} byte free, so need to free {} bytes", usableSpace, containerName, minRequiredSpace, toFree);
ArchiveInfo toDelete;
int deleteCount = 0;
@@ -1217,7 +1222,7 @@ public class FileSystemRepository implements ContentRepository {
// In order to accomplish this, we just peek at the head and check if it should be deleted.
// If so, then we call poll() to remove it
if (freed < toFree || getLastModTime(toDelete.toPath()) < removalTimeThreshold) {
- toDelete = fileQueue.poll(); // remove the head of the queue, which is already stored in 'toDelete'
+ toDelete = fileQueue.poll(); // remove the head of the queue, which is already stored in 'toDelete'
Files.deleteIfExists(toDelete.toPath());
containerState.decrementArchiveCount();
LOG.debug("Deleted archived ContentClaim with ID {} from Container {} because the archival size was exceeding the max configured size", toDelete.getName(), containerName);
@@ -1229,9 +1234,12 @@ public class FileSystemRepository implements ContentRepository {
if (freed >= toFree) {
// If the last mod time indicates that it should be removed, just continue loop.
if (deleteBasedOnTimestamp(fileQueue, removalTimeThreshold)) {
+ archiveExpirationLog.debug("Freed enough space ({} bytes freed, needed to free {} bytes) but will continue to expire data based on timestamp", freed, toFree);
continue;
}
+ archiveExpirationLog.debug("Freed enough space ({} bytes freed, needed to free {} bytes). Finished expiring data", freed, toFree);
+
final ArchiveInfo archiveInfo = fileQueue.peek();
final long oldestArchiveDate = archiveInfo == null ? System.currentTimeMillis() : getLastModTime(archiveInfo.toPath());
@@ -1256,6 +1264,7 @@ public class FileSystemRepository implements ContentRepository {
}
// Go through each container and grab the archived data into a List
+ archiveExpirationLog.debug("Searching for more archived data to expire");
final StopWatch stopWatch = new StopWatch(true);
for (int i = 0; i < SECTIONS_PER_CONTAINER; i++) {
final Path sectionContainer = container.resolve(String.valueOf(i));
@@ -1278,7 +1287,7 @@ public class FileSystemRepository implements ContentRepository {
Files.deleteIfExists(file);
containerState.decrementArchiveCount();
LOG.debug("Deleted archived ContentClaim with ID {} from Container {} because it was older than the configured max archival duration",
- file.toFile().getName(), containerName);
+ file.toFile().getName(), containerName);
} catch (final IOException ioe) {
LOG.warn("Failed to remove archived ContentClaim with ID {} from Container {} due to {}", file.toFile().getName(), containerName, ioe.toString());
if (LOG.isDebugEnabled()) {
@@ -1312,6 +1321,7 @@ public class FileSystemRepository implements ContentRepository {
final long sortRemainingMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS) - deleteExpiredMillis;
// Delete the oldest data
+ archiveExpirationLog.debug("Deleting data based on timestamp");
final Iterator<ArchiveInfo> itr = notYetExceedingThreshold.iterator();
int counter = 0;
while (itr.hasNext()) {
@@ -1325,7 +1335,7 @@ public class FileSystemRepository implements ContentRepository {
// Check if we've freed enough space every 25 files that we destroy
if (++counter % 25 == 0) {
- if (getContainerUsableSpace(containerName) > minRequiredSpace) { // check if we can stop now
+ if (getContainerUsableSpace(containerName) > minRequiredSpace) { // check if we can stop now
LOG.debug("Finished cleaning up archive for Container {}", containerName);
break;
}
@@ -1360,7 +1370,7 @@ public class FileSystemRepository implements ContentRepository {
final long cleanupMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS) - deleteOldestMillis - sortRemainingMillis - deleteExpiredMillis;
LOG.debug("Oldest Archive Date for Container {} is {}; delete expired = {} ms, sort remaining = {} ms, delete oldest = {} ms, cleanup = {} ms",
- containerName, new Date(oldestContainerArchive), deleteExpiredMillis, sortRemainingMillis, deleteOldestMillis, cleanupMillis);
+ containerName, new Date(oldestContainerArchive), deleteExpiredMillis, sortRemainingMillis, deleteOldestMillis, cleanupMillis);
return oldestContainerArchive;
}