You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/04/07 18:46:28 UTC
nifi git commit: NIFI-3631: This closes #1613. Do not change Active
Directory in IndexDirectoryManager when it becomes full. Instead,
wait until it is committed and is removed by the onIndexCommitted method. This
resolved a bug where the index can exceed
Repository: nifi
Updated Branches:
refs/heads/master a5d630672 -> 778ba3957
NIFI-3631: This closes #1613. Do not change Active Directory in IndexDirectoryManager when it becomes full. Instead, wait until it is committed and is removed by the onIndexCommitted method. This resolved a bug where the index can exceed the configured limit but not yet be committed and as a result would no longer be the active index. As a result, this bug causes the IndexWriter never to get closed/removed from the IndexManager, and so a memory leak is created
Signed-off-by: joewitt <jo...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/778ba395
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/778ba395
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/778ba395
Branch: refs/heads/master
Commit: 778ba3957e7d1b9ddc105b1655f53e85cf6ec2ab
Parents: a5d6306
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Mar 20 17:18:01 2017 -0400
Committer: joewitt <jo...@apache.org>
Committed: Fri Apr 7 14:42:18 2017 -0400
----------------------------------------------------------------------
.../index/lucene/IndexDirectoryManager.java | 9 ++--
.../provenance/index/lucene/IndexLocation.java | 21 +-------
.../index/lucene/TestIndexDirectoryManager.java | 51 +++++++++++++++++++-
3 files changed, 55 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/778ba395/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java
index 09878ff..53f74e0 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java
@@ -75,7 +75,7 @@ public class IndexDirectoryManager {
final long startTime = DirectoryUtils.getIndexTimestamp(indexDir);
final List<IndexLocation> dirsForTimestamp = indexLocationByTimestamp.computeIfAbsent(startTime, t -> new ArrayList<>());
- final IndexLocation indexLoc = new IndexLocation(indexDir, startTime, partitionName, repoConfig.getDesiredIndexSize());
+ final IndexLocation indexLoc = new IndexLocation(indexDir, startTime, partitionName);
dirsForTimestamp.add(indexLoc);
final Tuple<Long, IndexLocation> tuple = latestIndexByStorageDir.get(storageDir);
@@ -99,8 +99,7 @@ public class IndexDirectoryManager {
final Map.Entry<Long, List<IndexLocation>> entry = itr.next();
final List<IndexLocation> locations = entry.getValue();
- final IndexLocation locToRemove = new IndexLocation(directory, DirectoryUtils.getIndexTimestamp(directory),
- directory.getName(), repoConfig.getDesiredIndexSize());
+ final IndexLocation locToRemove = new IndexLocation(directory, DirectoryUtils.getIndexTimestamp(directory), directory.getName());
locations.remove(locToRemove);
if (locations.isEmpty()) {
itr.remove();
@@ -334,8 +333,8 @@ public class IndexDirectoryManager {
*/
public synchronized File getWritableIndexingDirectory(final long earliestTimestamp, final String partitionName) {
IndexLocation indexLoc = activeIndices.get(partitionName);
- if (indexLoc == null || indexLoc.isIndexFull()) {
- indexLoc = new IndexLocation(createIndex(earliestTimestamp, partitionName), earliestTimestamp, partitionName, repoConfig.getDesiredIndexSize());
+ if (indexLoc == null) {
+ indexLoc = new IndexLocation(createIndex(earliestTimestamp, partitionName), earliestTimestamp, partitionName);
logger.debug("Created new Index Directory {}", indexLoc);
indexLocationByTimestamp.computeIfAbsent(earliestTimestamp, t -> new ArrayList<>()).add(indexLoc);
http://git-wip-us.apache.org/repos/asf/nifi/blob/778ba395/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java
index 33867c6..f7de84f 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java
@@ -18,24 +18,16 @@
package org.apache.nifi.provenance.index.lucene;
import java.io.File;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.provenance.util.DirectoryUtils;
public class IndexLocation {
- private static final long SIZE_CHECK_MILLIS = TimeUnit.SECONDS.toMillis(30L);
-
private final File indexDirectory;
private final long indexStartTimestamp;
private final String partitionName;
- private final long desiredIndexSize;
- private volatile long lastSizeCheckTime = System.currentTimeMillis();
- public IndexLocation(final File indexDirectory, final long indexStartTimestamp, final String partitionName, final long desiredIndexSize) {
+ public IndexLocation(final File indexDirectory, final long indexStartTimestamp, final String partitionName) {
this.indexDirectory = indexDirectory;
this.indexStartTimestamp = indexStartTimestamp;
this.partitionName = partitionName;
- this.desiredIndexSize = desiredIndexSize;
}
public File getIndexDirectory() {
@@ -50,17 +42,6 @@ public class IndexLocation {
return partitionName;
}
- public boolean isIndexFull() {
- final long now = System.currentTimeMillis();
- final long millisSinceLastSizeCheck = now - lastSizeCheckTime;
- if (millisSinceLastSizeCheck < SIZE_CHECK_MILLIS) {
- return false;
- }
-
- lastSizeCheckTime = now;
- return DirectoryUtils.getSize(indexDirectory) >= desiredIndexSize;
- }
-
@Override
public int hashCode() {
return 31 + 41 * indexDirectory.hashCode();
http://git-wip-us.apache.org/repos/asf/nifi/blob/778ba395/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java
index 3f3c422..efcb601 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java
@@ -21,6 +21,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@@ -82,12 +85,58 @@ public class TestIndexDirectoryManager {
}
+ @Test
+ public void testActiveIndexNotLostWhenSizeExceeded() throws IOException, InterruptedException {
+ final RepositoryConfiguration config = createConfig(2);
+ config.setDesiredIndexSize(4096 * 128);
+
+ final File storageDir1 = config.getStorageDirectories().get("1");
+ final File storageDir2 = config.getStorageDirectories().get("2");
+
+ final File index1 = new File(storageDir1, "index-1");
+ final File index2 = new File(storageDir1, "index-2");
+ final File index3 = new File(storageDir2, "index-3");
+ final File index4 = new File(storageDir2, "index-4");
+
+ final File[] allIndices = new File[] {index1, index2, index3, index4};
+ for (final File file : allIndices) {
+ assertTrue(file.mkdirs() || file.exists());
+ }
+
+ try {
+ final IndexDirectoryManager mgr = new IndexDirectoryManager(config);
+ mgr.initialize();
+
+ File indexDir = mgr.getWritableIndexingDirectory(System.currentTimeMillis(), "1");
+ final File newFile = new File(indexDir, "1.bin");
+ try (final OutputStream fos = new FileOutputStream(newFile)) {
+ final byte[] data = new byte[4096];
+ for (int i = 0; i < 1024; i++) {
+ fos.write(data);
+ }
+ }
+
+ try {
+ final File newDir = mgr.getWritableIndexingDirectory(System.currentTimeMillis(), "1");
+ assertEquals(indexDir, newDir);
+ } finally {
+ newFile.delete();
+ }
+ } finally {
+ for (final File file : allIndices) {
+ file.delete();
+ }
+ }
+ }
+
+
+
private IndexLocation createLocation(final long timestamp) {
return createLocation(timestamp, "1");
}
private IndexLocation createLocation(final long timestamp, final String partitionName) {
- return new IndexLocation(new File("index-" + timestamp), timestamp, partitionName, 1024 * 1024L);
+ return new IndexLocation(new File("index-" + timestamp), timestamp, partitionName);
}
private RepositoryConfiguration createConfig(final int partitions) {