You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/04/07 18:46:28 UTC

nifi git commit: NIFI-3631: This closes #1613. Do not change Active Directory in IndexDirectoryManager when it becomes full. Instead, wait until it is committed and is removed by the onIndexCommitted method. This resolved a bug where the index can exceed

Repository: nifi
Updated Branches:
  refs/heads/master a5d630672 -> 778ba3957


NIFI-3631: This closes #1613. Do not change Active Directory in IndexDirectoryManager when it becomes full. Instead, wait until it is committed and is removed by the onIndexCommitted method. This resolved a bug where the index can exceed the configured limit but not yet be committed and as a result would no longer be the active index. As a result, this bug causes the IndexWriter never to get closed/removed from the IndexManager, and so a memory leak is created

Signed-off-by: joewitt <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/778ba395
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/778ba395
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/778ba395

Branch: refs/heads/master
Commit: 778ba3957e7d1b9ddc105b1655f53e85cf6ec2ab
Parents: a5d6306
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Mar 20 17:18:01 2017 -0400
Committer: joewitt <jo...@apache.org>
Committed: Fri Apr 7 14:42:18 2017 -0400

----------------------------------------------------------------------
 .../index/lucene/IndexDirectoryManager.java     |  9 ++--
 .../provenance/index/lucene/IndexLocation.java  | 21 +-------
 .../index/lucene/TestIndexDirectoryManager.java | 51 +++++++++++++++++++-
 3 files changed, 55 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/778ba395/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java
index 09878ff..53f74e0 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java
@@ -75,7 +75,7 @@ public class IndexDirectoryManager {
 
                 final long startTime = DirectoryUtils.getIndexTimestamp(indexDir);
                 final List<IndexLocation> dirsForTimestamp = indexLocationByTimestamp.computeIfAbsent(startTime, t -> new ArrayList<>());
-                final IndexLocation indexLoc = new IndexLocation(indexDir, startTime, partitionName, repoConfig.getDesiredIndexSize());
+                final IndexLocation indexLoc = new IndexLocation(indexDir, startTime, partitionName);
                 dirsForTimestamp.add(indexLoc);
 
                 final Tuple<Long, IndexLocation> tuple = latestIndexByStorageDir.get(storageDir);
@@ -99,8 +99,7 @@ public class IndexDirectoryManager {
             final Map.Entry<Long, List<IndexLocation>> entry = itr.next();
             final List<IndexLocation> locations = entry.getValue();
 
-            final IndexLocation locToRemove = new IndexLocation(directory, DirectoryUtils.getIndexTimestamp(directory),
-                directory.getName(), repoConfig.getDesiredIndexSize());
+            final IndexLocation locToRemove = new IndexLocation(directory, DirectoryUtils.getIndexTimestamp(directory), directory.getName());
             locations.remove(locToRemove);
             if (locations.isEmpty()) {
                 itr.remove();
@@ -334,8 +333,8 @@ public class IndexDirectoryManager {
      */
     public synchronized File getWritableIndexingDirectory(final long earliestTimestamp, final String partitionName) {
         IndexLocation indexLoc = activeIndices.get(partitionName);
-        if (indexLoc == null || indexLoc.isIndexFull()) {
-            indexLoc = new IndexLocation(createIndex(earliestTimestamp, partitionName), earliestTimestamp, partitionName, repoConfig.getDesiredIndexSize());
+        if (indexLoc == null) {
+            indexLoc = new IndexLocation(createIndex(earliestTimestamp, partitionName), earliestTimestamp, partitionName);
             logger.debug("Created new Index Directory {}", indexLoc);
 
             indexLocationByTimestamp.computeIfAbsent(earliestTimestamp, t -> new ArrayList<>()).add(indexLoc);

http://git-wip-us.apache.org/repos/asf/nifi/blob/778ba395/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java
index 33867c6..f7de84f 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java
@@ -18,24 +18,16 @@
 package org.apache.nifi.provenance.index.lucene;
 
 import java.io.File;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.provenance.util.DirectoryUtils;
 
 public class IndexLocation {
-    private static final long SIZE_CHECK_MILLIS = TimeUnit.SECONDS.toMillis(30L);
-
     private final File indexDirectory;
     private final long indexStartTimestamp;
     private final String partitionName;
-    private final long desiredIndexSize;
-    private volatile long lastSizeCheckTime = System.currentTimeMillis();
 
-    public IndexLocation(final File indexDirectory, final long indexStartTimestamp, final String partitionName, final long desiredIndexSize) {
+    public IndexLocation(final File indexDirectory, final long indexStartTimestamp, final String partitionName) {
         this.indexDirectory = indexDirectory;
         this.indexStartTimestamp = indexStartTimestamp;
         this.partitionName = partitionName;
-        this.desiredIndexSize = desiredIndexSize;
     }
 
     public File getIndexDirectory() {
@@ -50,17 +42,6 @@ public class IndexLocation {
         return partitionName;
     }
 
-    public boolean isIndexFull() {
-        final long now = System.currentTimeMillis();
-        final long millisSinceLastSizeCheck = now - lastSizeCheckTime;
-        if (millisSinceLastSizeCheck < SIZE_CHECK_MILLIS) {
-            return false;
-        }
-
-        lastSizeCheckTime = now;
-        return DirectoryUtils.getSize(indexDirectory) >= desiredIndexSize;
-    }
-
     @Override
     public int hashCode() {
         return 31 + 41 * indexDirectory.hashCode();

http://git-wip-us.apache.org/repos/asf/nifi/blob/778ba395/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java
index 3f3c422..efcb601 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java
@@ -21,6 +21,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
@@ -82,12 +85,58 @@ public class TestIndexDirectoryManager {
     }
 
 
+    @Test
+    public void testActiveIndexNotLostWhenSizeExceeded() throws IOException, InterruptedException {
+        final RepositoryConfiguration config = createConfig(2);
+        config.setDesiredIndexSize(4096 * 128);
+
+        final File storageDir1 = config.getStorageDirectories().get("1");
+        final File storageDir2 = config.getStorageDirectories().get("2");
+
+        final File index1 = new File(storageDir1, "index-1");
+        final File index2 = new File(storageDir1, "index-2");
+        final File index3 = new File(storageDir2, "index-3");
+        final File index4 = new File(storageDir2, "index-4");
+
+        final File[] allIndices = new File[] {index1, index2, index3, index4};
+        for (final File file : allIndices) {
+            assertTrue(file.mkdirs() || file.exists());
+        }
+
+        try {
+            final IndexDirectoryManager mgr = new IndexDirectoryManager(config);
+            mgr.initialize();
+
+            File indexDir = mgr.getWritableIndexingDirectory(System.currentTimeMillis(), "1");
+            final File newFile = new File(indexDir, "1.bin");
+            try (final OutputStream fos = new FileOutputStream(newFile)) {
+                final byte[] data = new byte[4096];
+                for (int i = 0; i < 1024; i++) {
+                    fos.write(data);
+                }
+            }
+
+            try {
+                final File newDir = mgr.getWritableIndexingDirectory(System.currentTimeMillis(), "1");
+                assertEquals(indexDir, newDir);
+            } finally {
+                newFile.delete();
+            }
+        } finally {
+            for (final File file : allIndices) {
+                file.delete();
+            }
+        }
+    }
+
+
+
     private IndexLocation createLocation(final long timestamp) {
         return createLocation(timestamp, "1");
     }
 
     private IndexLocation createLocation(final long timestamp, final String partitionName) {
-        return new IndexLocation(new File("index-" + timestamp), timestamp, partitionName, 1024 * 1024L);
+        return new IndexLocation(new File("index-" + timestamp), timestamp, partitionName);
     }
 
     private RepositoryConfiguration createConfig(final int partitions) {