You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/12/02 04:51:15 UTC

[09/24] nifi git commit: NIFI-1054: Fixed DOS line endings in xml, java and js source files

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
index 5822fc5..c301bf3 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
@@ -1,357 +1,357 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.hadoop;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.Progressable;
-import org.apache.nifi.annotation.notification.PrimaryNodeState;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.distributed.cache.client.Deserializer;
-import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
-import org.apache.nifi.distributed.cache.client.Serializer;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestListHDFS {
-
-    private TestRunner runner;
-    private ListHDFSWithMockedFileSystem proc;
-    private MockCacheClient service;
-
-    @Before
-    public void setup() throws InitializationException {
-        proc = new ListHDFSWithMockedFileSystem();
-        runner = TestRunners.newTestRunner(proc);
-
-        service = new MockCacheClient();
-        runner.addControllerService("service", service);
-        runner.enableControllerService(service);
-
-        runner.setProperty(ListHDFS.HADOOP_CONFIGURATION_RESOURCES, "src/test/resources/core-site.xml");
-        runner.setProperty(ListHDFS.DIRECTORY, "/test");
-        runner.setProperty(ListHDFS.DISTRIBUTED_CACHE_SERVICE, "service");
-    }
-
-    @Test
-    public void testListingHasCorrectAttributes() {
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
-
-        runner.run();
-
-        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
-        final MockFlowFile mff = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
-        mff.assertAttributeEquals("path", "/test");
-        mff.assertAttributeEquals("filename", "testFile.txt");
-    }
-
-
-    @Test
-    public void testRecursive() {
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
-
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
-        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
-
-        runner.run();
-
-        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
-
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS);
-        for (int i=0; i < 2; i++) {
-            final MockFlowFile ff = flowFiles.get(i);
-            final String filename = ff.getAttribute("filename");
-
-            if (filename.equals("testFile.txt")) {
-                ff.assertAttributeEquals("path", "/test");
-            } else if ( filename.equals("1.txt")) {
-                ff.assertAttributeEquals("path", "/test/testDir");
-            } else {
-                Assert.fail("filename was " + filename);
-            }
-        }
-    }
-
-    @Test
-    public void testNotRecursive() {
-        runner.setProperty(ListHDFS.RECURSE_SUBDIRS, "false");
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
-
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
-        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
-
-        runner.run();
-
-        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
-
-        final MockFlowFile mff1 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
-        mff1.assertAttributeEquals("path", "/test");
-        mff1.assertAttributeEquals("filename", "testFile.txt");
-    }
-
-
-    @Test
-    public void testNoListUntilUpdateFromRemoteOnPrimaryNodeChange() {
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
-
-        runner.run();
-
-        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
-
-        final MockFlowFile mff1 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
-        mff1.assertAttributeEquals("path", "/test");
-        mff1.assertAttributeEquals("filename", "testFile.txt");
-
-        runner.clearTransferState();
-
-        // add new file to pull
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt")));
-
-        // trigger primary node change
-        proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE);
-
-        // cause calls to service to fail
-        service.failOnCalls = true;
-
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
-
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
-
-        final String key = proc.getKey("/test");
-
-        // wait just to a bit to ensure that the timestamp changes when we update the service
-        final Object curVal = service.values.get(key);
-        try {
-            Thread.sleep(10L);
-        } catch (final InterruptedException ie) {
-        }
-
-        service.failOnCalls = false;
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
-
-        // ensure state saved both locally & remotely
-        assertTrue(proc.localStateSaved);
-        assertNotNull(service.values.get(key));
-        assertNotSame(curVal, service.values.get(key));
-    }
-
-
-    private FsPermission create777() {
-        return new FsPermission((short) 0777);
-    }
-
-
-    private class ListHDFSWithMockedFileSystem extends ListHDFS {
-        private final MockFileSystem fileSystem = new MockFileSystem();
-        private boolean localStateSaved = false;
-
-        @Override
-        protected FileSystem getFileSystem() {
-            return fileSystem;
-        }
-
-        @Override
-        protected File getPersistenceFile() {
-            return new File("target/conf/state-file");
-        }
-
-        @Override
-        protected FileSystem getFileSystem(final Configuration config) throws IOException {
-            return fileSystem;
-        }
-
-        @Override
-        protected void persistLocalState(final String directory, final String serializedState) throws IOException {
-            super.persistLocalState(directory, serializedState);
-            localStateSaved = true;
-        }
-    }
-
-
-    private class MockFileSystem extends FileSystem {
-        private final Map<Path, Set<FileStatus>> fileStatuses = new HashMap<>();
-
-        public void addFileStatus(final Path parent, final FileStatus child) {
-            Set<FileStatus> children = fileStatuses.get(parent);
-            if ( children == null ) {
-                children = new HashSet<>();
-                fileStatuses.put(parent, children);
-            }
-
-            children.add(child);
-        }
-
-
-        @Override
-        public long getDefaultBlockSize() {
-            return 1024L;
-        }
-
-        @Override
-        public short getDefaultReplication() {
-            return 1;
-        }
-
-        @Override
-        public URI getUri() {
-            return null;
-        }
-
-        @Override
-        public FSDataInputStream open(final Path f, final int bufferSize) throws IOException {
-            return null;
-        }
-
-        @Override
-        public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication,
-                final long blockSize, final Progressable progress) throws IOException {
-            return null;
-        }
-
-        @Override
-        public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) throws IOException {
-            return null;
-        }
-
-        @Override
-        public boolean rename(final Path src, final Path dst) throws IOException {
-            return false;
-        }
-
-        @Override
-        public boolean delete(final Path f, final boolean recursive) throws IOException {
-            return false;
-        }
-
-        @Override
-        public FileStatus[] listStatus(final Path f) throws FileNotFoundException, IOException {
-            final Set<FileStatus> statuses = fileStatuses.get(f);
-            if ( statuses == null ) {
-                return new FileStatus[0];
-            }
-
-            return statuses.toArray(new FileStatus[statuses.size()]);
-        }
-
-        @Override
-        public void setWorkingDirectory(final Path new_dir) {
-
-        }
-
-        @Override
-        public Path getWorkingDirectory() {
-            return new Path(new File(".").getAbsolutePath());
-        }
-
-        @Override
-        public boolean mkdirs(final Path f, final FsPermission permission) throws IOException {
-            return false;
-        }
-
-        @Override
-        public FileStatus getFileStatus(final Path f) throws IOException {
-            return null;
-        }
-
-    }
-
-
-    private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient {
-        private final ConcurrentMap<Object, Object> values = new ConcurrentHashMap<>();
-        private boolean failOnCalls = false;
-
-        private void verifyNotFail() throws IOException {
-            if ( failOnCalls ) {
-                throw new IOException("Could not call to remote service because Unit Test marked service unavailable");
-            }
-        }
-
-        @Override
-        public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
-            verifyNotFail();
-            final Object retValue = values.putIfAbsent(key, value);
-            return (retValue == null);
-        }
-
-        @Override
-        @SuppressWarnings("unchecked")
-        public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
-                final Deserializer<V> valueDeserializer) throws IOException {
-            verifyNotFail();
-            return (V) values.putIfAbsent(key, value);
-        }
-
-        @Override
-        public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
-            verifyNotFail();
-            return values.containsKey(key);
-        }
-
-        @Override
-        public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
-            verifyNotFail();
-            values.put(key, value);
-        }
-
-        @Override
-        @SuppressWarnings("unchecked")
-        public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
-            verifyNotFail();
-            return (V) values.get(key);
-        }
-
-        @Override
-        public void close() throws IOException {
-        }
-
-        @Override
-        public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException {
-            verifyNotFail();
-            values.remove(key);
-            return true;
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestListHDFS {
+
+    private TestRunner runner;
+    private ListHDFSWithMockedFileSystem proc;
+    private MockCacheClient service;
+
+    @Before
+    public void setup() throws InitializationException {
+        proc = new ListHDFSWithMockedFileSystem();
+        runner = TestRunners.newTestRunner(proc);
+
+        service = new MockCacheClient();
+        runner.addControllerService("service", service);
+        runner.enableControllerService(service);
+
+        runner.setProperty(ListHDFS.HADOOP_CONFIGURATION_RESOURCES, "src/test/resources/core-site.xml");
+        runner.setProperty(ListHDFS.DIRECTORY, "/test");
+        runner.setProperty(ListHDFS.DISTRIBUTED_CACHE_SERVICE, "service");
+    }
+
+    @Test
+    public void testListingHasCorrectAttributes() {
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
+        final MockFlowFile mff = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
+        mff.assertAttributeEquals("path", "/test");
+        mff.assertAttributeEquals("filename", "testFile.txt");
+    }
+
+
+    @Test
+    public void testRecursive() {
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
+        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
+
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS);
+        for (int i=0; i < 2; i++) {
+            final MockFlowFile ff = flowFiles.get(i);
+            final String filename = ff.getAttribute("filename");
+
+            if (filename.equals("testFile.txt")) {
+                ff.assertAttributeEquals("path", "/test");
+            } else if ( filename.equals("1.txt")) {
+                ff.assertAttributeEquals("path", "/test/testDir");
+            } else {
+                Assert.fail("filename was " + filename);
+            }
+        }
+    }
+
+    @Test
+    public void testNotRecursive() {
+        runner.setProperty(ListHDFS.RECURSE_SUBDIRS, "false");
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
+        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
+
+        final MockFlowFile mff1 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
+        mff1.assertAttributeEquals("path", "/test");
+        mff1.assertAttributeEquals("filename", "testFile.txt");
+    }
+
+
+    @Test
+    public void testNoListUntilUpdateFromRemoteOnPrimaryNodeChange() {
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
+
+        final MockFlowFile mff1 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
+        mff1.assertAttributeEquals("path", "/test");
+        mff1.assertAttributeEquals("filename", "testFile.txt");
+
+        runner.clearTransferState();
+
+        // add new file to pull
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt")));
+
+        // trigger primary node change
+        proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE);
+
+        // cause calls to service to fail
+        service.failOnCalls = true;
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
+
+        final String key = proc.getKey("/test");
+
+        // wait just to a bit to ensure that the timestamp changes when we update the service
+        final Object curVal = service.values.get(key);
+        try {
+            Thread.sleep(10L);
+        } catch (final InterruptedException ie) {
+        }
+
+        service.failOnCalls = false;
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
+
+        // ensure state saved both locally & remotely
+        assertTrue(proc.localStateSaved);
+        assertNotNull(service.values.get(key));
+        assertNotSame(curVal, service.values.get(key));
+    }
+
+
+    private FsPermission create777() {
+        return new FsPermission((short) 0777);
+    }
+
+
+    private class ListHDFSWithMockedFileSystem extends ListHDFS {
+        private final MockFileSystem fileSystem = new MockFileSystem();
+        private boolean localStateSaved = false;
+
+        @Override
+        protected FileSystem getFileSystem() {
+            return fileSystem;
+        }
+
+        @Override
+        protected File getPersistenceFile() {
+            return new File("target/conf/state-file");
+        }
+
+        @Override
+        protected FileSystem getFileSystem(final Configuration config) throws IOException {
+            return fileSystem;
+        }
+
+        @Override
+        protected void persistLocalState(final String directory, final String serializedState) throws IOException {
+            super.persistLocalState(directory, serializedState);
+            localStateSaved = true;
+        }
+    }
+
+
+    private class MockFileSystem extends FileSystem {
+        private final Map<Path, Set<FileStatus>> fileStatuses = new HashMap<>();
+
+        public void addFileStatus(final Path parent, final FileStatus child) {
+            Set<FileStatus> children = fileStatuses.get(parent);
+            if ( children == null ) {
+                children = new HashSet<>();
+                fileStatuses.put(parent, children);
+            }
+
+            children.add(child);
+        }
+
+
+        @Override
+        public long getDefaultBlockSize() {
+            return 1024L;
+        }
+
+        @Override
+        public short getDefaultReplication() {
+            return 1;
+        }
+
+        @Override
+        public URI getUri() {
+            return null;
+        }
+
+        @Override
+        public FSDataInputStream open(final Path f, final int bufferSize) throws IOException {
+            return null;
+        }
+
+        @Override
+        public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication,
+                final long blockSize, final Progressable progress) throws IOException {
+            return null;
+        }
+
+        @Override
+        public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) throws IOException {
+            return null;
+        }
+
+        @Override
+        public boolean rename(final Path src, final Path dst) throws IOException {
+            return false;
+        }
+
+        @Override
+        public boolean delete(final Path f, final boolean recursive) throws IOException {
+            return false;
+        }
+
+        @Override
+        public FileStatus[] listStatus(final Path f) throws FileNotFoundException, IOException {
+            final Set<FileStatus> statuses = fileStatuses.get(f);
+            if ( statuses == null ) {
+                return new FileStatus[0];
+            }
+
+            return statuses.toArray(new FileStatus[statuses.size()]);
+        }
+
+        @Override
+        public void setWorkingDirectory(final Path new_dir) {
+
+        }
+
+        @Override
+        public Path getWorkingDirectory() {
+            return new Path(new File(".").getAbsolutePath());
+        }
+
+        @Override
+        public boolean mkdirs(final Path f, final FsPermission permission) throws IOException {
+            return false;
+        }
+
+        @Override
+        public FileStatus getFileStatus(final Path f) throws IOException {
+            return null;
+        }
+
+    }
+
+
+    private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient {
+        private final ConcurrentMap<Object, Object> values = new ConcurrentHashMap<>();
+        private boolean failOnCalls = false;
+
+        private void verifyNotFail() throws IOException {
+            if ( failOnCalls ) {
+                throw new IOException("Could not call to remote service because Unit Test marked service unavailable");
+            }
+        }
+
+        @Override
+        public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
+            verifyNotFail();
+            final Object retValue = values.putIfAbsent(key, value);
+            return (retValue == null);
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
+                final Deserializer<V> valueDeserializer) throws IOException {
+            verifyNotFail();
+            return (V) values.putIfAbsent(key, value);
+        }
+
+        @Override
+        public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
+            verifyNotFail();
+            return values.containsKey(key);
+        }
+
+        @Override
+        public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
+            verifyNotFail();
+            values.put(key, value);
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
+            verifyNotFail();
+            return (V) values.get(key);
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+
+        @Override
+        public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException {
+            verifyNotFail();
+            values.remove(key);
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
index 31f31a5..57d0d78 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
@@ -1,512 +1,512 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance.lucene;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.FSDirectory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class IndexManager implements Closeable {
-    private static final Logger logger = LoggerFactory.getLogger(IndexManager.class);
-
-    private final Lock lock = new ReentrantLock();
-    private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
-    private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>();
-
-
-    public void removeIndex(final File indexDirectory) {
-        final File absoluteFile = indexDirectory.getAbsoluteFile();
-        logger.info("Removing index {}", indexDirectory);
-
-        lock.lock();
-        try {
-            final IndexWriterCount count = writerCounts.remove(absoluteFile);
-            if ( count != null ) {
-                try {
-                    count.close();
-                } catch (final IOException ioe) {
-                    logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile);
-                    if ( logger.isDebugEnabled() ) {
-                        logger.warn("", ioe);
-                    }
-                }
-            }
-
-            for ( final List<ActiveIndexSearcher> searcherList : activeSearchers.values() ) {
-                for ( final ActiveIndexSearcher searcher : searcherList ) {
-                    try {
-                        searcher.close();
-                    } catch (final IOException ioe) {
-                        logger.warn("Failed to close Index Searcher {} for {} due to {}",
-                                searcher.getSearcher(), absoluteFile, ioe);
-                        if ( logger.isDebugEnabled() ) {
-                            logger.warn("", ioe);
-                        }
-                    }
-                }
-            }
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException {
-        final File absoluteFile = indexingDirectory.getAbsoluteFile();
-        logger.debug("Borrowing index writer for {}", indexingDirectory);
-
-        lock.lock();
-        try {
-            IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
-            if ( writerCount == null ) {
-                final List<Closeable> closeables = new ArrayList<>();
-                final Directory directory = FSDirectory.open(indexingDirectory);
-                closeables.add(directory);
-
-                try {
-                    final Analyzer analyzer = new StandardAnalyzer();
-                    closeables.add(analyzer);
-
-                    final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
-                    config.setWriteLockTimeout(300000L);
-
-                    final IndexWriter indexWriter = new IndexWriter(directory, config);
-                    writerCount = new IndexWriterCount(indexWriter, analyzer, directory, 1);
-                    logger.debug("Providing new index writer for {}", indexingDirectory);
-                } catch (final IOException ioe) {
-                    for ( final Closeable closeable : closeables ) {
-                        try {
-                            closeable.close();
-                        } catch (final IOException ioe2) {
-                            ioe.addSuppressed(ioe2);
-                        }
-                    }
-
-                    throw ioe;
-                }
-
-                writerCounts.put(absoluteFile, writerCount);
-
-                // Mark any active searchers as poisoned because we are updating the index
-                final List<ActiveIndexSearcher> searchers = activeSearchers.get(absoluteFile);
-                if ( searchers != null ) {
-                    for (final ActiveIndexSearcher activeSearcher : searchers) {
-                        activeSearcher.poison();
-                    }
-                }
-            } else {
-                logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
-                writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
-                        writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
-            }
-
-            return writerCount.getWriter();
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) {
-        final File absoluteFile = indexingDirectory.getAbsoluteFile();
-        logger.debug("Returning Index Writer for {} to IndexManager", indexingDirectory);
-
-        lock.lock();
-        try {
-            final IndexWriterCount count = writerCounts.remove(absoluteFile);
-
-            try {
-                if ( count == null ) {
-                    logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. "
-                            + "This could potentially lead to a resource leak", writer, indexingDirectory);
-                    writer.close();
-                } else if ( count.getCount() <= 1 ) {
-                    // we are finished with this writer.
-                    logger.debug("Closing Index Writer for {}", indexingDirectory);
-                    count.close();
-                } else {
-                    // decrement the count.
-                    logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1);
-                    writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1));
-                }
-            } catch (final IOException ioe) {
-                logger.warn("Failed to close Index Writer {} due to {}", writer, ioe);
-                if ( logger.isDebugEnabled() ) {
-                    logger.warn("", ioe);
-                }
-            }
-        } finally {
-            lock.unlock();
-        }
-    }
-
-
-    public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
-        final File absoluteFile = indexDir.getAbsoluteFile();
-        logger.debug("Borrowing index searcher for {}", indexDir);
-
-        lock.lock();
-        try {
-            // check if we already have a reader cached.
-            List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
-            if ( currentlyCached == null ) {
-                currentlyCached = new ArrayList<>();
-                activeSearchers.put(absoluteFile, currentlyCached);
-            } else {
-                // keep track of any searchers that have been closed so that we can remove them
-                // from our cache later.
-                final Set<ActiveIndexSearcher> expired = new HashSet<>();
-
-                try {
-                    for ( final ActiveIndexSearcher searcher : currentlyCached ) {
-                        if ( searcher.isCache() ) {
-                            // if the searcher is poisoned, we want to close and expire it.
-                            if ( searcher.isPoisoned() ) {
-                                logger.debug("Index Searcher for {} is poisoned; removing cached searcher", absoluteFile);
-                                expired.add(searcher);
-                                continue;
-                            }
-
-                            // if there are no references to the reader, it will have been closed. Since there is no
-                            // isClosed() method, this is how we determine whether it's been closed or not.
-                            final int refCount = searcher.getSearcher().getIndexReader().getRefCount();
-                            if ( refCount <= 0 ) {
-                                // if refCount == 0, then the reader has been closed, so we need to discard the searcher
-                                logger.debug("Reference count for cached Index Searcher for {} is currently {}; "
-                                        + "removing cached searcher", absoluteFile, refCount);
-                                expired.add(searcher);
-                                continue;
-                            }
-
-                            logger.debug("Providing previously cached index searcher for {}", indexDir);
-                            return searcher.getSearcher();
-                        }
-                    }
-                } finally {
-                    // if we have any expired index searchers, we need to close them and remove them
-                    // from the cache so that we don't try to use them again later.
-                    for ( final ActiveIndexSearcher searcher : expired ) {
-                        try {
-                            searcher.close();
-                        } catch (final Exception e) {
-                            logger.debug("Failed to close 'expired' IndexSearcher {}", searcher);
-                        }
-
-                        currentlyCached.remove(searcher);
-                    }
-                }
-            }
-
-            final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
-            if ( writerCount == null ) {
-                final Directory directory = FSDirectory.open(absoluteFile);
-                logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir);
-
-                try {
-                    final DirectoryReader directoryReader = DirectoryReader.open(directory);
-                    final IndexSearcher searcher = new IndexSearcher(directoryReader);
-
-                    // we want to cache the searcher that we create, since it's just a reader.
-                    final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, directoryReader, directory, true);
-                    currentlyCached.add(cached);
-
-                    return cached.getSearcher();
-                } catch (final IOException e) {
-                    try {
-                        directory.close();
-                    } catch (final IOException ioe) {
-                        e.addSuppressed(ioe);
-                    }
-
-                    throw e;
-                }
-            } else {
-                logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing "
-                        + "counter to {}", indexDir, writerCount.getCount() + 1);
-
-                // increment the writer count to ensure that it's kept open.
-                writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
-                        writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
-
-                // create a new Index Searcher from the writer so that we don't have an issue with trying
-                // to read from a directory that's locked. If we get the "no segments* file found" with
-                // Lucene, this indicates that an IndexWriter already has the directory open.
-                final IndexWriter writer = writerCount.getWriter();
-                final DirectoryReader directoryReader = DirectoryReader.open(writer, false);
-                final IndexSearcher searcher = new IndexSearcher(directoryReader);
-
-                // we don't want to cache this searcher because it's based on a writer, so we want to get
-                // new values the next time that we search.
-                final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, directoryReader, null, false);
-
-                currentlyCached.add(activeSearcher);
-                return activeSearcher.getSearcher();
-            }
-        } finally {
-            lock.unlock();
-        }
-    }
-
-
-    public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) {
-        final File absoluteFile = indexDirectory.getAbsoluteFile();
-        logger.debug("Returning index searcher for {} to IndexManager", indexDirectory);
-
-        lock.lock();
-        try {
-            // check if we already have a reader cached.
-            final List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
-            if ( currentlyCached == null ) {
-                logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could "
-                        + "result in a resource leak", indexDirectory);
-                return;
-            }
-
-            // Check if the given searcher is in our list. We use an Iterator to do this so that if we
-            // find it we can call remove() on the iterator if need be.
-            final Iterator<ActiveIndexSearcher> itr = currentlyCached.iterator();
-            while (itr.hasNext()) {
-                final ActiveIndexSearcher activeSearcher = itr.next();
-                if ( activeSearcher.getSearcher().equals(searcher) ) {
-                    if ( activeSearcher.isCache() ) {
-                        // if the searcher is poisoned, close it and remove from "pool".
-                        if ( activeSearcher.isPoisoned() ) {
-                            itr.remove();
-
-                            try {
-                                logger.debug("Closing Index Searcher for {} because it is poisoned", indexDirectory);
-                                activeSearcher.close();
-                            } catch (final IOException ioe) {
-                                logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
-                                if ( logger.isDebugEnabled() ) {
-                                    logger.warn("", ioe);
-                                }
-                            }
-
-                            return;
-                        } else {
-                            // the searcher is cached. Just leave it open.
-                            logger.debug("Index searcher for {} is cached; leaving open", indexDirectory);
-                            return;
-                        }
-                    } else {
-                        // searcher is not cached. It was created from a writer, and we want
-                        // the newest updates the next time that we get a searcher, so we will
-                        // go ahead and close this one out.
-                        itr.remove();
-
-                        // decrement the writer count because we incremented it when creating the searcher
-                        final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
-                        if ( writerCount != null ) {
-                            if ( writerCount.getCount() <= 1 ) {
-                                try {
-                                    logger.debug("Index searcher for {} is not cached. Writer count is "
-                                            + "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1);
-
-                                    writerCount.close();
-                                } catch (final IOException ioe) {
-                                    logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe);
-                                    if ( logger.isDebugEnabled() ) {
-                                        logger.warn("", ioe);
-                                    }
-                                }
-                            } else {
-                                logger.debug("Index searcher for {} is not cached. Writer count is decremented "
-                                        + "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1);
-
-                                writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
-                                        writerCount.getAnalyzer(), writerCount.getDirectory(),
-                                        writerCount.getCount() - 1));
-                            }
-                        }
-
-                        try {
-                            logger.debug("Closing Index Searcher for {}", indexDirectory);
-                            activeSearcher.close();
-                        } catch (final IOException ioe) {
-                            logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
-                            if ( logger.isDebugEnabled() ) {
-                                logger.warn("", ioe);
-                            }
-                        }
-                    }
-                }
-            }
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    @Override
-    public void close() throws IOException {
-        logger.debug("Closing Index Manager");
-
-        lock.lock();
-        try {
-            IOException ioe = null;
-
-            for ( final IndexWriterCount count : writerCounts.values() ) {
-                try {
-                    count.close();
-                } catch (final IOException e) {
-                    if ( ioe == null ) {
-                        ioe = e;
-                    } else {
-                        ioe.addSuppressed(e);
-                    }
-                }
-            }
-
-            for (final List<ActiveIndexSearcher> searcherList : activeSearchers.values()) {
-                for (final ActiveIndexSearcher searcher : searcherList) {
-                    try {
-                        searcher.close();
-                    } catch (final IOException e) {
-                        if ( ioe == null ) {
-                            ioe = e;
-                        } else {
-                            ioe.addSuppressed(e);
-                        }
-                    }
-                }
-            }
-
-            if ( ioe != null ) {
-                throw ioe;
-            }
-        } finally {
-            lock.unlock();
-        }
-    }
-
-
-    private static void close(final Closeable... closeables) throws IOException {
-        IOException ioe = null;
-        for ( final Closeable closeable : closeables ) {
-            if ( closeable == null ) {
-                continue;
-            }
-
-            try {
-                closeable.close();
-            } catch (final IOException e) {
-                if ( ioe == null ) {
-                    ioe = e;
-                } else {
-                    ioe.addSuppressed(e);
-                }
-            }
-        }
-
-        if ( ioe != null ) {
-            throw ioe;
-        }
-    }
-
-
-    private static class ActiveIndexSearcher implements Closeable {
-        private final IndexSearcher searcher;
-        private final DirectoryReader directoryReader;
-        private final Directory directory;
-        private final boolean cache;
-        private boolean poisoned = false;
-
-        public ActiveIndexSearcher(final IndexSearcher searcher, final DirectoryReader directoryReader,
-                final Directory directory, final boolean cache) {
-            this.searcher = searcher;
-            this.directoryReader = directoryReader;
-            this.directory = directory;
-            this.cache = cache;
-        }
-
-        public boolean isCache() {
-            return cache;
-        }
-
-        public IndexSearcher getSearcher() {
-            return searcher;
-        }
-
-        public boolean isPoisoned() {
-            return poisoned;
-        }
-
-        public void poison() {
-            this.poisoned = true;
-        }
-
-        @Override
-        public void close() throws IOException {
-            IndexManager.close(directoryReader, directory);
-        }
-    }
-
-
-    private static class IndexWriterCount implements Closeable {
-        private final IndexWriter writer;
-        private final Analyzer analyzer;
-        private final Directory directory;
-        private final int count;
-
-        public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) {
-            this.writer = writer;
-            this.analyzer = analyzer;
-            this.directory = directory;
-            this.count = count;
-        }
-
-        public Analyzer getAnalyzer() {
-            return analyzer;
-        }
-
-        public Directory getDirectory() {
-            return directory;
-        }
-
-        public IndexWriter getWriter() {
-            return writer;
-        }
-
-        public int getCount() {
-            return count;
-        }
-
-        @Override
-        public void close() throws IOException {
-            IndexManager.close(writer, analyzer, directory);
-        }
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.lucene;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IndexManager implements Closeable {
+    private static final Logger logger = LoggerFactory.getLogger(IndexManager.class);
+
+    private final Lock lock = new ReentrantLock();
+    private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
+    private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>();
+
+
+    public void removeIndex(final File indexDirectory) {
+        final File absoluteFile = indexDirectory.getAbsoluteFile();
+        logger.info("Removing index {}", indexDirectory);
+
+        lock.lock();
+        try {
+            final IndexWriterCount count = writerCounts.remove(absoluteFile);
+            if ( count != null ) {
+                try {
+                    count.close();
+                } catch (final IOException ioe) {
+                    logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile);
+                    if ( logger.isDebugEnabled() ) {
+                        logger.warn("", ioe);
+                    }
+                }
+            }
+
+            for ( final List<ActiveIndexSearcher> searcherList : activeSearchers.values() ) {
+                for ( final ActiveIndexSearcher searcher : searcherList ) {
+                    try {
+                        searcher.close();
+                    } catch (final IOException ioe) {
+                        logger.warn("Failed to close Index Searcher {} for {} due to {}",
+                                searcher.getSearcher(), absoluteFile, ioe);
+                        if ( logger.isDebugEnabled() ) {
+                            logger.warn("", ioe);
+                        }
+                    }
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException {
+        final File absoluteFile = indexingDirectory.getAbsoluteFile();
+        logger.debug("Borrowing index writer for {}", indexingDirectory);
+
+        lock.lock();
+        try {
+            IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+            if ( writerCount == null ) {
+                final List<Closeable> closeables = new ArrayList<>();
+                final Directory directory = FSDirectory.open(indexingDirectory);
+                closeables.add(directory);
+
+                try {
+                    final Analyzer analyzer = new StandardAnalyzer();
+                    closeables.add(analyzer);
+
+                    final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
+                    config.setWriteLockTimeout(300000L);
+
+                    final IndexWriter indexWriter = new IndexWriter(directory, config);
+                    writerCount = new IndexWriterCount(indexWriter, analyzer, directory, 1);
+                    logger.debug("Providing new index writer for {}", indexingDirectory);
+                } catch (final IOException ioe) {
+                    for ( final Closeable closeable : closeables ) {
+                        try {
+                            closeable.close();
+                        } catch (final IOException ioe2) {
+                            ioe.addSuppressed(ioe2);
+                        }
+                    }
+
+                    throw ioe;
+                }
+
+                writerCounts.put(absoluteFile, writerCount);
+
+                // Mark any active searchers as poisoned because we are updating the index
+                final List<ActiveIndexSearcher> searchers = activeSearchers.get(absoluteFile);
+                if ( searchers != null ) {
+                    for (final ActiveIndexSearcher activeSearcher : searchers) {
+                        activeSearcher.poison();
+                    }
+                }
+            } else {
+                logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
+                writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+                        writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
+            }
+
+            return writerCount.getWriter();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) {
+        final File absoluteFile = indexingDirectory.getAbsoluteFile();
+        logger.debug("Returning Index Writer for {} to IndexManager", indexingDirectory);
+
+        lock.lock();
+        try {
+            final IndexWriterCount count = writerCounts.remove(absoluteFile);
+
+            try {
+                if ( count == null ) {
+                    logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. "
+                            + "This could potentially lead to a resource leak", writer, indexingDirectory);
+                    writer.close();
+                } else if ( count.getCount() <= 1 ) {
+                    // we are finished with this writer.
+                    logger.debug("Closing Index Writer for {}", indexingDirectory);
+                    count.close();
+                } else {
+                    // decrement the count.
+                    logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1);
+                    writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1));
+                }
+            } catch (final IOException ioe) {
+                logger.warn("Failed to close Index Writer {} due to {}", writer, ioe);
+                if ( logger.isDebugEnabled() ) {
+                    logger.warn("", ioe);
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+
+    public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
+        final File absoluteFile = indexDir.getAbsoluteFile();
+        logger.debug("Borrowing index searcher for {}", indexDir);
+
+        lock.lock();
+        try {
+            // check if we already have a reader cached.
+            List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
+            if ( currentlyCached == null ) {
+                currentlyCached = new ArrayList<>();
+                activeSearchers.put(absoluteFile, currentlyCached);
+            } else {
+                // keep track of any searchers that have been closed so that we can remove them
+                // from our cache later.
+                final Set<ActiveIndexSearcher> expired = new HashSet<>();
+
+                try {
+                    for ( final ActiveIndexSearcher searcher : currentlyCached ) {
+                        if ( searcher.isCache() ) {
+                            // if the searcher is poisoned, we want to close and expire it.
+                            if ( searcher.isPoisoned() ) {
+                                logger.debug("Index Searcher for {} is poisoned; removing cached searcher", absoluteFile);
+                                expired.add(searcher);
+                                continue;
+                            }
+
+                            // if there are no references to the reader, it will have been closed. Since there is no
+                            // isClosed() method, this is how we determine whether it's been closed or not.
+                            final int refCount = searcher.getSearcher().getIndexReader().getRefCount();
+                            if ( refCount <= 0 ) {
+                                // if refCount == 0, then the reader has been closed, so we need to discard the searcher
+                                logger.debug("Reference count for cached Index Searcher for {} is currently {}; "
+                                        + "removing cached searcher", absoluteFile, refCount);
+                                expired.add(searcher);
+                                continue;
+                            }
+
+                            logger.debug("Providing previously cached index searcher for {}", indexDir);
+                            return searcher.getSearcher();
+                        }
+                    }
+                } finally {
+                    // if we have any expired index searchers, we need to close them and remove them
+                    // from the cache so that we don't try to use them again later.
+                    for ( final ActiveIndexSearcher searcher : expired ) {
+                        try {
+                            searcher.close();
+                        } catch (final Exception e) {
+                            logger.debug("Failed to close 'expired' IndexSearcher {}", searcher);
+                        }
+
+                        currentlyCached.remove(searcher);
+                    }
+                }
+            }
+
+            final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+            if ( writerCount == null ) {
+                final Directory directory = FSDirectory.open(absoluteFile);
+                logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir);
+
+                try {
+                    final DirectoryReader directoryReader = DirectoryReader.open(directory);
+                    final IndexSearcher searcher = new IndexSearcher(directoryReader);
+
+                    // we want to cache the searcher that we create, since it's just a reader.
+                    final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, directoryReader, directory, true);
+                    currentlyCached.add(cached);
+
+                    return cached.getSearcher();
+                } catch (final IOException e) {
+                    try {
+                        directory.close();
+                    } catch (final IOException ioe) {
+                        e.addSuppressed(ioe);
+                    }
+
+                    throw e;
+                }
+            } else {
+                logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing "
+                        + "counter to {}", indexDir, writerCount.getCount() + 1);
+
+                // increment the writer count to ensure that it's kept open.
+                writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+                        writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
+
+                // create a new Index Searcher from the writer so that we don't have an issue with trying
+                // to read from a directory that's locked. If we get the "no segments* file found" with
+                // Lucene, this indicates that an IndexWriter already has the directory open.
+                final IndexWriter writer = writerCount.getWriter();
+                final DirectoryReader directoryReader = DirectoryReader.open(writer, false);
+                final IndexSearcher searcher = new IndexSearcher(directoryReader);
+
+                // we don't want to cache this searcher because it's based on a writer, so we want to get
+                // new values the next time that we search.
+                final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, directoryReader, null, false);
+
+                currentlyCached.add(activeSearcher);
+                return activeSearcher.getSearcher();
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+
+    public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) {
+        final File absoluteFile = indexDirectory.getAbsoluteFile();
+        logger.debug("Returning index searcher for {} to IndexManager", indexDirectory);
+
+        lock.lock();
+        try {
+            // check if we already have a reader cached.
+            final List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
+            if ( currentlyCached == null ) {
+                logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could "
+                        + "result in a resource leak", indexDirectory);
+                return;
+            }
+
+            // Check if the given searcher is in our list. We use an Iterator to do this so that if we
+            // find it we can call remove() on the iterator if need be.
+            final Iterator<ActiveIndexSearcher> itr = currentlyCached.iterator();
+            while (itr.hasNext()) {
+                final ActiveIndexSearcher activeSearcher = itr.next();
+                if ( activeSearcher.getSearcher().equals(searcher) ) {
+                    if ( activeSearcher.isCache() ) {
+                        // if the searcher is poisoned, close it and remove from "pool".
+                        if ( activeSearcher.isPoisoned() ) {
+                            itr.remove();
+
+                            try {
+                                logger.debug("Closing Index Searcher for {} because it is poisoned", indexDirectory);
+                                activeSearcher.close();
+                            } catch (final IOException ioe) {
+                                logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
+                                if ( logger.isDebugEnabled() ) {
+                                    logger.warn("", ioe);
+                                }
+                            }
+
+                            return;
+                        } else {
+                            // the searcher is cached. Just leave it open.
+                            logger.debug("Index searcher for {} is cached; leaving open", indexDirectory);
+                            return;
+                        }
+                    } else {
+                        // searcher is not cached. It was created from a writer, and we want
+                        // the newest updates the next time that we get a searcher, so we will
+                        // go ahead and close this one out.
+                        itr.remove();
+
+                        // decrement the writer count because we incremented it when creating the searcher
+                        final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+                        if ( writerCount != null ) {
+                            if ( writerCount.getCount() <= 1 ) {
+                                try {
+                                    logger.debug("Index searcher for {} is not cached. Writer count is "
+                                            + "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1);
+
+                                    writerCount.close();
+                                } catch (final IOException ioe) {
+                                    logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe);
+                                    if ( logger.isDebugEnabled() ) {
+                                        logger.warn("", ioe);
+                                    }
+                                }
+                            } else {
+                                logger.debug("Index searcher for {} is not cached. Writer count is decremented "
+                                        + "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1);
+
+                                writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+                                        writerCount.getAnalyzer(), writerCount.getDirectory(),
+                                        writerCount.getCount() - 1));
+                            }
+                        }
+
+                        try {
+                            logger.debug("Closing Index Searcher for {}", indexDirectory);
+                            activeSearcher.close();
+                        } catch (final IOException ioe) {
+                            logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
+                            if ( logger.isDebugEnabled() ) {
+                                logger.warn("", ioe);
+                            }
+                        }
+                    }
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        logger.debug("Closing Index Manager");
+
+        lock.lock();
+        try {
+            IOException ioe = null;
+
+            for ( final IndexWriterCount count : writerCounts.values() ) {
+                try {
+                    count.close();
+                } catch (final IOException e) {
+                    if ( ioe == null ) {
+                        ioe = e;
+                    } else {
+                        ioe.addSuppressed(e);
+                    }
+                }
+            }
+
+            for (final List<ActiveIndexSearcher> searcherList : activeSearchers.values()) {
+                for (final ActiveIndexSearcher searcher : searcherList) {
+                    try {
+                        searcher.close();
+                    } catch (final IOException e) {
+                        if ( ioe == null ) {
+                            ioe = e;
+                        } else {
+                            ioe.addSuppressed(e);
+                        }
+                    }
+                }
+            }
+
+            if ( ioe != null ) {
+                throw ioe;
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+
+    private static void close(final Closeable... closeables) throws IOException {
+        IOException ioe = null;
+        for ( final Closeable closeable : closeables ) {
+            if ( closeable == null ) {
+                continue;
+            }
+
+            try {
+                closeable.close();
+            } catch (final IOException e) {
+                if ( ioe == null ) {
+                    ioe = e;
+                } else {
+                    ioe.addSuppressed(e);
+                }
+            }
+        }
+
+        if ( ioe != null ) {
+            throw ioe;
+        }
+    }
+
+
+    private static class ActiveIndexSearcher implements Closeable {
+        private final IndexSearcher searcher;
+        private final DirectoryReader directoryReader;
+        private final Directory directory;
+        private final boolean cache;
+        private boolean poisoned = false;
+
+        public ActiveIndexSearcher(final IndexSearcher searcher, final DirectoryReader directoryReader,
+                final Directory directory, final boolean cache) {
+            this.searcher = searcher;
+            this.directoryReader = directoryReader;
+            this.directory = directory;
+            this.cache = cache;
+        }
+
+        public boolean isCache() {
+            return cache;
+        }
+
+        public IndexSearcher getSearcher() {
+            return searcher;
+        }
+
+        public boolean isPoisoned() {
+            return poisoned;
+        }
+
+        public void poison() {
+            this.poisoned = true;
+        }
+
+        @Override
+        public void close() throws IOException {
+            IndexManager.close(directoryReader, directory);
+        }
+    }
+
+
+    private static class IndexWriterCount implements Closeable {
+        private final IndexWriter writer;
+        private final Analyzer analyzer;
+        private final Directory directory;
+        private final int count;
+
+        public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) {
+            this.writer = writer;
+            this.analyzer = analyzer;
+            this.directory = directory;
+            this.count = count;
+        }
+
+        public Analyzer getAnalyzer() {
+            return analyzer;
+        }
+
+        public Directory getDirectory() {
+            return directory;
+        }
+
+        public IndexWriter getWriter() {
+            return writer;
+        }
+
+        public int getCount() {
+            return count;
+        }
+
+        @Override
+        public void close() throws IOException {
+            IndexManager.close(writer, analyzer, directory);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
index 61f86e7..60328fa 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
@@ -1,155 +1,155 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance.toc;
-
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-
-/**
- * Standard implementation of TocReader.
- *
- * Expects .toc file to be in the following format;
- *
- * byte 0: version
- * byte 1: boolean: compressionFlag -> 0 = journal is NOT compressed, 1 = journal is compressed
- * byte 2-9: long: offset of block 0
- * byte 10-17: long: offset of block 1
- * ...
- * byte (N*8+2)-(N*8+9): long: offset of block N
- */
-public class StandardTocReader implements TocReader {
-    private final boolean compressed;
-    private final long[] offsets;
-    private final long[] firstEventIds;
-
-    public StandardTocReader(final File file) throws IOException {
-        try (final FileInputStream fis = new FileInputStream(file);
-                final DataInputStream dis = new DataInputStream(fis)) {
-
-            final int version = dis.read();
-            if ( version < 0 ) {
-                throw new EOFException();
-            }
-
-            final int compressionFlag = dis.read();
-            if ( compressionFlag < 0 ) {
-                throw new EOFException();
-            }
-
-            if ( compressionFlag == 0 ) {
-                compressed = false;
-            } else if ( compressionFlag == 1 ) {
-                compressed = true;
-            } else {
-                throw new IOException("Table of Contents appears to be corrupt: could not read 'compression flag' from header; expected value of 0 or 1 but got " + compressionFlag);
-            }
-
-            final int blockInfoBytes;
-            switch (version) {
-                case 1:
-                    blockInfoBytes = 8;
-                    break;
-                case 2:
-                default:
-                    blockInfoBytes = 16;
-                    break;
-            }
-
-            final int numBlocks = (int) ((file.length() - 2) / blockInfoBytes);
-            offsets = new long[numBlocks];
-
-            if ( version > 1 ) {
-                firstEventIds = new long[numBlocks];
-            } else {
-                firstEventIds = new long[0];
-            }
-
-            for (int i=0; i < numBlocks; i++) {
-                offsets[i] = dis.readLong();
-
-                if ( version > 1 ) {
-                    firstEventIds[i] = dis.readLong();
-                }
-            }
-        }
-    }
-
-    @Override
-    public boolean isCompressed() {
-        return compressed;
-    }
-
-    @Override
-    public long getBlockOffset(final int blockIndex) {
-        if ( blockIndex >= offsets.length ) {
-            return -1L;
-        }
-        return offsets[blockIndex];
-    }
-
-    @Override
-    public long getLastBlockOffset() {
-        if ( offsets.length == 0 ) {
-            return 0L;
-        }
-        return offsets[offsets.length - 1];
-    }
-
-    @Override
-    public void close() throws IOException {
-    }
-
-    @Override
-    public int getBlockIndex(final long blockOffset) {
-        for (int i=0; i < offsets.length; i++) {
-            if ( offsets[i] > blockOffset ) {
-                // if the offset is less than the offset of our first block,
-                // just return 0 to indicate the first block. Otherwise,
-                // return i-1 because i represents the first block whose offset is
-                // greater than 'blockOffset'.
-                return (i == 0) ? 0 : i-1;
-            }
-        }
-
-        // None of the blocks have an offset greater than the provided offset.
-        // Therefore, if the event is present, it must be in the last block.
-        return offsets.length - 1;
-    }
-
-    @Override
-    public Integer getBlockIndexForEventId(final long eventId) {
-        // if we don't have event ID's stored in the TOC (which happens for version 1 of the TOC),
-        // or if the event ID is less than the first Event ID in this TOC, then the Event ID
-        // is unknown -- return null.
-        if ( firstEventIds.length == 0 || eventId < firstEventIds[0] ) {
-            return null;
-        }
-
-        for (int i=1; i < firstEventIds.length; i++) {
-            if ( firstEventIds[i] > eventId ) {
-                return i-1;
-            }
-        }
-
-        // None of the blocks start with an Event ID greater than the provided ID.
-        // Therefore, if the event is present, it must be in the last block.
-        return firstEventIds.length - 1;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+/**
+ * Standard implementation of TocReader.
+ *
+ * Expects .toc file to be in the following format;
+ *
+ * byte 0: version
+ * byte 1: boolean: compressionFlag -> 0 = journal is NOT compressed, 1 = journal is compressed
+ * byte 2-9: long: offset of block 0
+ * byte 10-17: long: offset of block 1
+ * ...
+ * byte (N*8+2)-(N*8+9): long: offset of block N
+ */
+public class StandardTocReader implements TocReader {
+    private final boolean compressed;
+    private final long[] offsets;
+    private final long[] firstEventIds;
+
+    public StandardTocReader(final File file) throws IOException {
+        try (final FileInputStream fis = new FileInputStream(file);
+                final DataInputStream dis = new DataInputStream(fis)) {
+
+            final int version = dis.read();
+            if ( version < 0 ) {
+                throw new EOFException();
+            }
+
+            final int compressionFlag = dis.read();
+            if ( compressionFlag < 0 ) {
+                throw new EOFException();
+            }
+
+            if ( compressionFlag == 0 ) {
+                compressed = false;
+            } else if ( compressionFlag == 1 ) {
+                compressed = true;
+            } else {
+                throw new IOException("Table of Contents appears to be corrupt: could not read 'compression flag' from header; expected value of 0 or 1 but got " + compressionFlag);
+            }
+
+            final int blockInfoBytes;
+            switch (version) {
+                case 1:
+                    blockInfoBytes = 8;
+                    break;
+                case 2:
+                default:
+                    blockInfoBytes = 16;
+                    break;
+            }
+
+            final int numBlocks = (int) ((file.length() - 2) / blockInfoBytes);
+            offsets = new long[numBlocks];
+
+            if ( version > 1 ) {
+                firstEventIds = new long[numBlocks];
+            } else {
+                firstEventIds = new long[0];
+            }
+
+            for (int i=0; i < numBlocks; i++) {
+                offsets[i] = dis.readLong();
+
+                if ( version > 1 ) {
+                    firstEventIds[i] = dis.readLong();
+                }
+            }
+        }
+    }
+
+    @Override
+    public boolean isCompressed() {
+        return compressed;
+    }
+
+    @Override
+    public long getBlockOffset(final int blockIndex) {
+        if ( blockIndex >= offsets.length ) {
+            return -1L;
+        }
+        return offsets[blockIndex];
+    }
+
+    @Override
+    public long getLastBlockOffset() {
+        if ( offsets.length == 0 ) {
+            return 0L;
+        }
+        return offsets[offsets.length - 1];
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public int getBlockIndex(final long blockOffset) {
+        for (int i=0; i < offsets.length; i++) {
+            if ( offsets[i] > blockOffset ) {
+                // if the offset is less than the offset of our first block,
+                // just return 0 to indicate the first block. Otherwise,
+                // return i-1 because i represents the first block whose offset is
+                // greater than 'blockOffset'.
+                return (i == 0) ? 0 : i-1;
+            }
+        }
+
+        // None of the blocks have an offset greater than the provided offset.
+        // Therefore, if the event is present, it must be in the last block.
+        return offsets.length - 1;
+    }
+
+    @Override
+    public Integer getBlockIndexForEventId(final long eventId) {
+        // if we don't have event ID's stored in the TOC (which happens for version 1 of the TOC),
+        // or if the event ID is less than the first Event ID in this TOC, then the Event ID
+        // is unknown -- return null.
+        if ( firstEventIds.length == 0 || eventId < firstEventIds[0] ) {
+            return null;
+        }
+
+        for (int i=1; i < firstEventIds.length; i++) {
+            if ( firstEventIds[i] > eventId ) {
+                return i-1;
+            }
+        }
+
+        // None of the blocks start with an Event ID greater than the provided ID.
+        // Therefore, if the event is present, it must be in the last block.
+        return firstEventIds.length - 1;
+    }
+}