You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/12/02 04:51:15 UTC
[09/24] nifi git commit: NIFI-1054: Fixed DOS line endings in xml,
java and js source files
http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
index 5822fc5..c301bf3 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
@@ -1,357 +1,357 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.hadoop;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.Progressable;
-import org.apache.nifi.annotation.notification.PrimaryNodeState;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.distributed.cache.client.Deserializer;
-import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
-import org.apache.nifi.distributed.cache.client.Serializer;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestListHDFS {
-
- private TestRunner runner;
- private ListHDFSWithMockedFileSystem proc;
- private MockCacheClient service;
-
- @Before
- public void setup() throws InitializationException {
- proc = new ListHDFSWithMockedFileSystem();
- runner = TestRunners.newTestRunner(proc);
-
- service = new MockCacheClient();
- runner.addControllerService("service", service);
- runner.enableControllerService(service);
-
- runner.setProperty(ListHDFS.HADOOP_CONFIGURATION_RESOURCES, "src/test/resources/core-site.xml");
- runner.setProperty(ListHDFS.DIRECTORY, "/test");
- runner.setProperty(ListHDFS.DISTRIBUTED_CACHE_SERVICE, "service");
- }
-
- @Test
- public void testListingHasCorrectAttributes() {
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
-
- runner.run();
-
- runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
- final MockFlowFile mff = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
- mff.assertAttributeEquals("path", "/test");
- mff.assertAttributeEquals("filename", "testFile.txt");
- }
-
-
- @Test
- public void testRecursive() {
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
-
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
- proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
-
- runner.run();
-
- runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
-
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS);
- for (int i=0; i < 2; i++) {
- final MockFlowFile ff = flowFiles.get(i);
- final String filename = ff.getAttribute("filename");
-
- if (filename.equals("testFile.txt")) {
- ff.assertAttributeEquals("path", "/test");
- } else if ( filename.equals("1.txt")) {
- ff.assertAttributeEquals("path", "/test/testDir");
- } else {
- Assert.fail("filename was " + filename);
- }
- }
- }
-
- @Test
- public void testNotRecursive() {
- runner.setProperty(ListHDFS.RECURSE_SUBDIRS, "false");
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
-
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
- proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
-
- runner.run();
-
- runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
-
- final MockFlowFile mff1 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
- mff1.assertAttributeEquals("path", "/test");
- mff1.assertAttributeEquals("filename", "testFile.txt");
- }
-
-
- @Test
- public void testNoListUntilUpdateFromRemoteOnPrimaryNodeChange() {
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
-
- runner.run();
-
- runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
-
- final MockFlowFile mff1 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
- mff1.assertAttributeEquals("path", "/test");
- mff1.assertAttributeEquals("filename", "testFile.txt");
-
- runner.clearTransferState();
-
- // add new file to pull
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt")));
-
- // trigger primary node change
- proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE);
-
- // cause calls to service to fail
- service.failOnCalls = true;
-
- runner.run();
- runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
-
- runner.run();
- runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
-
- final String key = proc.getKey("/test");
-
- // wait just to a bit to ensure that the timestamp changes when we update the service
- final Object curVal = service.values.get(key);
- try {
- Thread.sleep(10L);
- } catch (final InterruptedException ie) {
- }
-
- service.failOnCalls = false;
- runner.run();
- runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
-
- // ensure state saved both locally & remotely
- assertTrue(proc.localStateSaved);
- assertNotNull(service.values.get(key));
- assertNotSame(curVal, service.values.get(key));
- }
-
-
- private FsPermission create777() {
- return new FsPermission((short) 0777);
- }
-
-
- private class ListHDFSWithMockedFileSystem extends ListHDFS {
- private final MockFileSystem fileSystem = new MockFileSystem();
- private boolean localStateSaved = false;
-
- @Override
- protected FileSystem getFileSystem() {
- return fileSystem;
- }
-
- @Override
- protected File getPersistenceFile() {
- return new File("target/conf/state-file");
- }
-
- @Override
- protected FileSystem getFileSystem(final Configuration config) throws IOException {
- return fileSystem;
- }
-
- @Override
- protected void persistLocalState(final String directory, final String serializedState) throws IOException {
- super.persistLocalState(directory, serializedState);
- localStateSaved = true;
- }
- }
-
-
- private class MockFileSystem extends FileSystem {
- private final Map<Path, Set<FileStatus>> fileStatuses = new HashMap<>();
-
- public void addFileStatus(final Path parent, final FileStatus child) {
- Set<FileStatus> children = fileStatuses.get(parent);
- if ( children == null ) {
- children = new HashSet<>();
- fileStatuses.put(parent, children);
- }
-
- children.add(child);
- }
-
-
- @Override
- public long getDefaultBlockSize() {
- return 1024L;
- }
-
- @Override
- public short getDefaultReplication() {
- return 1;
- }
-
- @Override
- public URI getUri() {
- return null;
- }
-
- @Override
- public FSDataInputStream open(final Path f, final int bufferSize) throws IOException {
- return null;
- }
-
- @Override
- public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication,
- final long blockSize, final Progressable progress) throws IOException {
- return null;
- }
-
- @Override
- public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) throws IOException {
- return null;
- }
-
- @Override
- public boolean rename(final Path src, final Path dst) throws IOException {
- return false;
- }
-
- @Override
- public boolean delete(final Path f, final boolean recursive) throws IOException {
- return false;
- }
-
- @Override
- public FileStatus[] listStatus(final Path f) throws FileNotFoundException, IOException {
- final Set<FileStatus> statuses = fileStatuses.get(f);
- if ( statuses == null ) {
- return new FileStatus[0];
- }
-
- return statuses.toArray(new FileStatus[statuses.size()]);
- }
-
- @Override
- public void setWorkingDirectory(final Path new_dir) {
-
- }
-
- @Override
- public Path getWorkingDirectory() {
- return new Path(new File(".").getAbsolutePath());
- }
-
- @Override
- public boolean mkdirs(final Path f, final FsPermission permission) throws IOException {
- return false;
- }
-
- @Override
- public FileStatus getFileStatus(final Path f) throws IOException {
- return null;
- }
-
- }
-
-
- private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient {
- private final ConcurrentMap<Object, Object> values = new ConcurrentHashMap<>();
- private boolean failOnCalls = false;
-
- private void verifyNotFail() throws IOException {
- if ( failOnCalls ) {
- throw new IOException("Could not call to remote service because Unit Test marked service unavailable");
- }
- }
-
- @Override
- public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
- verifyNotFail();
- final Object retValue = values.putIfAbsent(key, value);
- return (retValue == null);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
- final Deserializer<V> valueDeserializer) throws IOException {
- verifyNotFail();
- return (V) values.putIfAbsent(key, value);
- }
-
- @Override
- public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
- verifyNotFail();
- return values.containsKey(key);
- }
-
- @Override
- public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
- verifyNotFail();
- values.put(key, value);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
- verifyNotFail();
- return (V) values.get(key);
- }
-
- @Override
- public void close() throws IOException {
- }
-
- @Override
- public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException {
- verifyNotFail();
- values.remove(key);
- return true;
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestListHDFS {
+
+ private TestRunner runner;
+ private ListHDFSWithMockedFileSystem proc;
+ private MockCacheClient service;
+
+ @Before
+ public void setup() throws InitializationException {
+ proc = new ListHDFSWithMockedFileSystem();
+ runner = TestRunners.newTestRunner(proc);
+
+ service = new MockCacheClient();
+ runner.addControllerService("service", service);
+ runner.enableControllerService(service);
+
+ runner.setProperty(ListHDFS.HADOOP_CONFIGURATION_RESOURCES, "src/test/resources/core-site.xml");
+ runner.setProperty(ListHDFS.DIRECTORY, "/test");
+ runner.setProperty(ListHDFS.DISTRIBUTED_CACHE_SERVICE, "service");
+ }
+
+ @Test
+ public void testListingHasCorrectAttributes() {
+ proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
+ final MockFlowFile mff = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
+ mff.assertAttributeEquals("path", "/test");
+ mff.assertAttributeEquals("filename", "testFile.txt");
+ }
+
+
+ @Test
+ public void testRecursive() {
+ proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+
+ proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
+ proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
+
+ final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS);
+ for (int i=0; i < 2; i++) {
+ final MockFlowFile ff = flowFiles.get(i);
+ final String filename = ff.getAttribute("filename");
+
+ if (filename.equals("testFile.txt")) {
+ ff.assertAttributeEquals("path", "/test");
+ } else if ( filename.equals("1.txt")) {
+ ff.assertAttributeEquals("path", "/test/testDir");
+ } else {
+ Assert.fail("filename was " + filename);
+ }
+ }
+ }
+
+ @Test
+ public void testNotRecursive() {
+ runner.setProperty(ListHDFS.RECURSE_SUBDIRS, "false");
+ proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+
+ proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
+ proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
+
+ final MockFlowFile mff1 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
+ mff1.assertAttributeEquals("path", "/test");
+ mff1.assertAttributeEquals("filename", "testFile.txt");
+ }
+
+
+ @Test
+ public void testNoListUntilUpdateFromRemoteOnPrimaryNodeChange() {
+ proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
+
+ final MockFlowFile mff1 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
+ mff1.assertAttributeEquals("path", "/test");
+ mff1.assertAttributeEquals("filename", "testFile.txt");
+
+ runner.clearTransferState();
+
+ // add new file to pull
+ proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt")));
+
+ // trigger primary node change
+ proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE);
+
+ // cause calls to service to fail
+ service.failOnCalls = true;
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
+
+ final String key = proc.getKey("/test");
+
+ // wait just to a bit to ensure that the timestamp changes when we update the service
+ final Object curVal = service.values.get(key);
+ try {
+ Thread.sleep(10L);
+ } catch (final InterruptedException ie) {
+ }
+
+ service.failOnCalls = false;
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
+
+ // ensure state saved both locally & remotely
+ assertTrue(proc.localStateSaved);
+ assertNotNull(service.values.get(key));
+ assertNotSame(curVal, service.values.get(key));
+ }
+
+
+ private FsPermission create777() {
+ return new FsPermission((short) 0777);
+ }
+
+
+ private class ListHDFSWithMockedFileSystem extends ListHDFS {
+ private final MockFileSystem fileSystem = new MockFileSystem();
+ private boolean localStateSaved = false;
+
+ @Override
+ protected FileSystem getFileSystem() {
+ return fileSystem;
+ }
+
+ @Override
+ protected File getPersistenceFile() {
+ return new File("target/conf/state-file");
+ }
+
+ @Override
+ protected FileSystem getFileSystem(final Configuration config) throws IOException {
+ return fileSystem;
+ }
+
+ @Override
+ protected void persistLocalState(final String directory, final String serializedState) throws IOException {
+ super.persistLocalState(directory, serializedState);
+ localStateSaved = true;
+ }
+ }
+
+
+ private class MockFileSystem extends FileSystem {
+ private final Map<Path, Set<FileStatus>> fileStatuses = new HashMap<>();
+
+ public void addFileStatus(final Path parent, final FileStatus child) {
+ Set<FileStatus> children = fileStatuses.get(parent);
+ if ( children == null ) {
+ children = new HashSet<>();
+ fileStatuses.put(parent, children);
+ }
+
+ children.add(child);
+ }
+
+
+ @Override
+ public long getDefaultBlockSize() {
+ return 1024L;
+ }
+
+ @Override
+ public short getDefaultReplication() {
+ return 1;
+ }
+
+ @Override
+ public URI getUri() {
+ return null;
+ }
+
+ @Override
+ public FSDataInputStream open(final Path f, final int bufferSize) throws IOException {
+ return null;
+ }
+
+ @Override
+ public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication,
+ final long blockSize, final Progressable progress) throws IOException {
+ return null;
+ }
+
+ @Override
+ public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) throws IOException {
+ return null;
+ }
+
+ @Override
+ public boolean rename(final Path src, final Path dst) throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean delete(final Path f, final boolean recursive) throws IOException {
+ return false;
+ }
+
+ @Override
+ public FileStatus[] listStatus(final Path f) throws FileNotFoundException, IOException {
+ final Set<FileStatus> statuses = fileStatuses.get(f);
+ if ( statuses == null ) {
+ return new FileStatus[0];
+ }
+
+ return statuses.toArray(new FileStatus[statuses.size()]);
+ }
+
+ @Override
+ public void setWorkingDirectory(final Path new_dir) {
+
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return new Path(new File(".").getAbsolutePath());
+ }
+
+ @Override
+ public boolean mkdirs(final Path f, final FsPermission permission) throws IOException {
+ return false;
+ }
+
+ @Override
+ public FileStatus getFileStatus(final Path f) throws IOException {
+ return null;
+ }
+
+ }
+
+
+ private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient {
+ private final ConcurrentMap<Object, Object> values = new ConcurrentHashMap<>();
+ private boolean failOnCalls = false;
+
+ private void verifyNotFail() throws IOException {
+ if ( failOnCalls ) {
+ throw new IOException("Could not call to remote service because Unit Test marked service unavailable");
+ }
+ }
+
+ @Override
+ public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
+ verifyNotFail();
+ final Object retValue = values.putIfAbsent(key, value);
+ return (retValue == null);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
+ final Deserializer<V> valueDeserializer) throws IOException {
+ verifyNotFail();
+ return (V) values.putIfAbsent(key, value);
+ }
+
+ @Override
+ public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
+ verifyNotFail();
+ return values.containsKey(key);
+ }
+
+ @Override
+ public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
+ verifyNotFail();
+ values.put(key, value);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
+ verifyNotFail();
+ return (V) values.get(key);
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException {
+ verifyNotFail();
+ values.remove(key);
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
index 31f31a5..57d0d78 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
@@ -1,512 +1,512 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance.lucene;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.FSDirectory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class IndexManager implements Closeable {
- private static final Logger logger = LoggerFactory.getLogger(IndexManager.class);
-
- private final Lock lock = new ReentrantLock();
- private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
- private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>();
-
-
- public void removeIndex(final File indexDirectory) {
- final File absoluteFile = indexDirectory.getAbsoluteFile();
- logger.info("Removing index {}", indexDirectory);
-
- lock.lock();
- try {
- final IndexWriterCount count = writerCounts.remove(absoluteFile);
- if ( count != null ) {
- try {
- count.close();
- } catch (final IOException ioe) {
- logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile);
- if ( logger.isDebugEnabled() ) {
- logger.warn("", ioe);
- }
- }
- }
-
- for ( final List<ActiveIndexSearcher> searcherList : activeSearchers.values() ) {
- for ( final ActiveIndexSearcher searcher : searcherList ) {
- try {
- searcher.close();
- } catch (final IOException ioe) {
- logger.warn("Failed to close Index Searcher {} for {} due to {}",
- searcher.getSearcher(), absoluteFile, ioe);
- if ( logger.isDebugEnabled() ) {
- logger.warn("", ioe);
- }
- }
- }
- }
- } finally {
- lock.unlock();
- }
- }
-
- public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException {
- final File absoluteFile = indexingDirectory.getAbsoluteFile();
- logger.debug("Borrowing index writer for {}", indexingDirectory);
-
- lock.lock();
- try {
- IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
- if ( writerCount == null ) {
- final List<Closeable> closeables = new ArrayList<>();
- final Directory directory = FSDirectory.open(indexingDirectory);
- closeables.add(directory);
-
- try {
- final Analyzer analyzer = new StandardAnalyzer();
- closeables.add(analyzer);
-
- final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
- config.setWriteLockTimeout(300000L);
-
- final IndexWriter indexWriter = new IndexWriter(directory, config);
- writerCount = new IndexWriterCount(indexWriter, analyzer, directory, 1);
- logger.debug("Providing new index writer for {}", indexingDirectory);
- } catch (final IOException ioe) {
- for ( final Closeable closeable : closeables ) {
- try {
- closeable.close();
- } catch (final IOException ioe2) {
- ioe.addSuppressed(ioe2);
- }
- }
-
- throw ioe;
- }
-
- writerCounts.put(absoluteFile, writerCount);
-
- // Mark any active searchers as poisoned because we are updating the index
- final List<ActiveIndexSearcher> searchers = activeSearchers.get(absoluteFile);
- if ( searchers != null ) {
- for (final ActiveIndexSearcher activeSearcher : searchers) {
- activeSearcher.poison();
- }
- }
- } else {
- logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
- writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
- writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
- }
-
- return writerCount.getWriter();
- } finally {
- lock.unlock();
- }
- }
-
- public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) {
- final File absoluteFile = indexingDirectory.getAbsoluteFile();
- logger.debug("Returning Index Writer for {} to IndexManager", indexingDirectory);
-
- lock.lock();
- try {
- final IndexWriterCount count = writerCounts.remove(absoluteFile);
-
- try {
- if ( count == null ) {
- logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. "
- + "This could potentially lead to a resource leak", writer, indexingDirectory);
- writer.close();
- } else if ( count.getCount() <= 1 ) {
- // we are finished with this writer.
- logger.debug("Closing Index Writer for {}", indexingDirectory);
- count.close();
- } else {
- // decrement the count.
- logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1);
- writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1));
- }
- } catch (final IOException ioe) {
- logger.warn("Failed to close Index Writer {} due to {}", writer, ioe);
- if ( logger.isDebugEnabled() ) {
- logger.warn("", ioe);
- }
- }
- } finally {
- lock.unlock();
- }
- }
-
-
- public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
- final File absoluteFile = indexDir.getAbsoluteFile();
- logger.debug("Borrowing index searcher for {}", indexDir);
-
- lock.lock();
- try {
- // check if we already have a reader cached.
- List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
- if ( currentlyCached == null ) {
- currentlyCached = new ArrayList<>();
- activeSearchers.put(absoluteFile, currentlyCached);
- } else {
- // keep track of any searchers that have been closed so that we can remove them
- // from our cache later.
- final Set<ActiveIndexSearcher> expired = new HashSet<>();
-
- try {
- for ( final ActiveIndexSearcher searcher : currentlyCached ) {
- if ( searcher.isCache() ) {
- // if the searcher is poisoned, we want to close and expire it.
- if ( searcher.isPoisoned() ) {
- logger.debug("Index Searcher for {} is poisoned; removing cached searcher", absoluteFile);
- expired.add(searcher);
- continue;
- }
-
- // if there are no references to the reader, it will have been closed. Since there is no
- // isClosed() method, this is how we determine whether it's been closed or not.
- final int refCount = searcher.getSearcher().getIndexReader().getRefCount();
- if ( refCount <= 0 ) {
- // if refCount == 0, then the reader has been closed, so we need to discard the searcher
- logger.debug("Reference count for cached Index Searcher for {} is currently {}; "
- + "removing cached searcher", absoluteFile, refCount);
- expired.add(searcher);
- continue;
- }
-
- logger.debug("Providing previously cached index searcher for {}", indexDir);
- return searcher.getSearcher();
- }
- }
- } finally {
- // if we have any expired index searchers, we need to close them and remove them
- // from the cache so that we don't try to use them again later.
- for ( final ActiveIndexSearcher searcher : expired ) {
- try {
- searcher.close();
- } catch (final Exception e) {
- logger.debug("Failed to close 'expired' IndexSearcher {}", searcher);
- }
-
- currentlyCached.remove(searcher);
- }
- }
- }
-
- final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
- if ( writerCount == null ) {
- final Directory directory = FSDirectory.open(absoluteFile);
- logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir);
-
- try {
- final DirectoryReader directoryReader = DirectoryReader.open(directory);
- final IndexSearcher searcher = new IndexSearcher(directoryReader);
-
- // we want to cache the searcher that we create, since it's just a reader.
- final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, directoryReader, directory, true);
- currentlyCached.add(cached);
-
- return cached.getSearcher();
- } catch (final IOException e) {
- try {
- directory.close();
- } catch (final IOException ioe) {
- e.addSuppressed(ioe);
- }
-
- throw e;
- }
- } else {
- logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing "
- + "counter to {}", indexDir, writerCount.getCount() + 1);
-
- // increment the writer count to ensure that it's kept open.
- writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
- writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
-
- // create a new Index Searcher from the writer so that we don't have an issue with trying
- // to read from a directory that's locked. If we get the "no segments* file found" with
- // Lucene, this indicates that an IndexWriter already has the directory open.
- final IndexWriter writer = writerCount.getWriter();
- final DirectoryReader directoryReader = DirectoryReader.open(writer, false);
- final IndexSearcher searcher = new IndexSearcher(directoryReader);
-
- // we don't want to cache this searcher because it's based on a writer, so we want to get
- // new values the next time that we search.
- final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, directoryReader, null, false);
-
- currentlyCached.add(activeSearcher);
- return activeSearcher.getSearcher();
- }
- } finally {
- lock.unlock();
- }
- }
-
-
- public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) {
- final File absoluteFile = indexDirectory.getAbsoluteFile();
- logger.debug("Returning index searcher for {} to IndexManager", indexDirectory);
-
- lock.lock();
- try {
- // check if we already have a reader cached.
- final List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
- if ( currentlyCached == null ) {
- logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could "
- + "result in a resource leak", indexDirectory);
- return;
- }
-
- // Check if the given searcher is in our list. We use an Iterator to do this so that if we
- // find it we can call remove() on the iterator if need be.
- final Iterator<ActiveIndexSearcher> itr = currentlyCached.iterator();
- while (itr.hasNext()) {
- final ActiveIndexSearcher activeSearcher = itr.next();
- if ( activeSearcher.getSearcher().equals(searcher) ) {
- if ( activeSearcher.isCache() ) {
- // if the searcher is poisoned, close it and remove from "pool".
- if ( activeSearcher.isPoisoned() ) {
- itr.remove();
-
- try {
- logger.debug("Closing Index Searcher for {} because it is poisoned", indexDirectory);
- activeSearcher.close();
- } catch (final IOException ioe) {
- logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
- if ( logger.isDebugEnabled() ) {
- logger.warn("", ioe);
- }
- }
-
- return;
- } else {
- // the searcher is cached. Just leave it open.
- logger.debug("Index searcher for {} is cached; leaving open", indexDirectory);
- return;
- }
- } else {
- // searcher is not cached. It was created from a writer, and we want
- // the newest updates the next time that we get a searcher, so we will
- // go ahead and close this one out.
- itr.remove();
-
- // decrement the writer count because we incremented it when creating the searcher
- final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
- if ( writerCount != null ) {
- if ( writerCount.getCount() <= 1 ) {
- try {
- logger.debug("Index searcher for {} is not cached. Writer count is "
- + "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1);
-
- writerCount.close();
- } catch (final IOException ioe) {
- logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe);
- if ( logger.isDebugEnabled() ) {
- logger.warn("", ioe);
- }
- }
- } else {
- logger.debug("Index searcher for {} is not cached. Writer count is decremented "
- + "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1);
-
- writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
- writerCount.getAnalyzer(), writerCount.getDirectory(),
- writerCount.getCount() - 1));
- }
- }
-
- try {
- logger.debug("Closing Index Searcher for {}", indexDirectory);
- activeSearcher.close();
- } catch (final IOException ioe) {
- logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
- if ( logger.isDebugEnabled() ) {
- logger.warn("", ioe);
- }
- }
- }
- }
- }
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public void close() throws IOException {
- logger.debug("Closing Index Manager");
-
- lock.lock();
- try {
- IOException ioe = null;
-
- for ( final IndexWriterCount count : writerCounts.values() ) {
- try {
- count.close();
- } catch (final IOException e) {
- if ( ioe == null ) {
- ioe = e;
- } else {
- ioe.addSuppressed(e);
- }
- }
- }
-
- for (final List<ActiveIndexSearcher> searcherList : activeSearchers.values()) {
- for (final ActiveIndexSearcher searcher : searcherList) {
- try {
- searcher.close();
- } catch (final IOException e) {
- if ( ioe == null ) {
- ioe = e;
- } else {
- ioe.addSuppressed(e);
- }
- }
- }
- }
-
- if ( ioe != null ) {
- throw ioe;
- }
- } finally {
- lock.unlock();
- }
- }
-
-
- private static void close(final Closeable... closeables) throws IOException {
- IOException ioe = null;
- for ( final Closeable closeable : closeables ) {
- if ( closeable == null ) {
- continue;
- }
-
- try {
- closeable.close();
- } catch (final IOException e) {
- if ( ioe == null ) {
- ioe = e;
- } else {
- ioe.addSuppressed(e);
- }
- }
- }
-
- if ( ioe != null ) {
- throw ioe;
- }
- }
-
-
- private static class ActiveIndexSearcher implements Closeable {
- private final IndexSearcher searcher;
- private final DirectoryReader directoryReader;
- private final Directory directory;
- private final boolean cache;
- private boolean poisoned = false;
-
- public ActiveIndexSearcher(final IndexSearcher searcher, final DirectoryReader directoryReader,
- final Directory directory, final boolean cache) {
- this.searcher = searcher;
- this.directoryReader = directoryReader;
- this.directory = directory;
- this.cache = cache;
- }
-
- public boolean isCache() {
- return cache;
- }
-
- public IndexSearcher getSearcher() {
- return searcher;
- }
-
- public boolean isPoisoned() {
- return poisoned;
- }
-
- public void poison() {
- this.poisoned = true;
- }
-
- @Override
- public void close() throws IOException {
- IndexManager.close(directoryReader, directory);
- }
- }
-
-
- private static class IndexWriterCount implements Closeable {
- private final IndexWriter writer;
- private final Analyzer analyzer;
- private final Directory directory;
- private final int count;
-
- public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) {
- this.writer = writer;
- this.analyzer = analyzer;
- this.directory = directory;
- this.count = count;
- }
-
- public Analyzer getAnalyzer() {
- return analyzer;
- }
-
- public Directory getDirectory() {
- return directory;
- }
-
- public IndexWriter getWriter() {
- return writer;
- }
-
- public int getCount() {
- return count;
- }
-
- @Override
- public void close() throws IOException {
- IndexManager.close(writer, analyzer, directory);
- }
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.lucene;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IndexManager implements Closeable {
+ private static final Logger logger = LoggerFactory.getLogger(IndexManager.class);
+
+ private final Lock lock = new ReentrantLock();
+ private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
+ private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>();
+
+
+ public void removeIndex(final File indexDirectory) {
+ final File absoluteFile = indexDirectory.getAbsoluteFile();
+ logger.info("Removing index {}", indexDirectory);
+
+ lock.lock();
+ try {
+ final IndexWriterCount count = writerCounts.remove(absoluteFile);
+ if ( count != null ) {
+ try {
+ count.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+ }
+
+ for ( final List<ActiveIndexSearcher> searcherList : activeSearchers.values() ) {
+ for ( final ActiveIndexSearcher searcher : searcherList ) {
+ try {
+ searcher.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close Index Searcher {} for {} due to {}",
+ searcher.getSearcher(), absoluteFile, ioe);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException {
+ final File absoluteFile = indexingDirectory.getAbsoluteFile();
+ logger.debug("Borrowing index writer for {}", indexingDirectory);
+
+ lock.lock();
+ try {
+ IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+ if ( writerCount == null ) {
+ final List<Closeable> closeables = new ArrayList<>();
+ final Directory directory = FSDirectory.open(indexingDirectory);
+ closeables.add(directory);
+
+ try {
+ final Analyzer analyzer = new StandardAnalyzer();
+ closeables.add(analyzer);
+
+ final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
+ config.setWriteLockTimeout(300000L);
+
+ final IndexWriter indexWriter = new IndexWriter(directory, config);
+ writerCount = new IndexWriterCount(indexWriter, analyzer, directory, 1);
+ logger.debug("Providing new index writer for {}", indexingDirectory);
+ } catch (final IOException ioe) {
+ for ( final Closeable closeable : closeables ) {
+ try {
+ closeable.close();
+ } catch (final IOException ioe2) {
+ ioe.addSuppressed(ioe2);
+ }
+ }
+
+ throw ioe;
+ }
+
+ writerCounts.put(absoluteFile, writerCount);
+
+ // Mark any active searchers as poisoned because we are updating the index
+ final List<ActiveIndexSearcher> searchers = activeSearchers.get(absoluteFile);
+ if ( searchers != null ) {
+ for (final ActiveIndexSearcher activeSearcher : searchers) {
+ activeSearcher.poison();
+ }
+ }
+ } else {
+ logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
+ writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+ writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
+ }
+
+ return writerCount.getWriter();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) {
+ final File absoluteFile = indexingDirectory.getAbsoluteFile();
+ logger.debug("Returning Index Writer for {} to IndexManager", indexingDirectory);
+
+ lock.lock();
+ try {
+ final IndexWriterCount count = writerCounts.remove(absoluteFile);
+
+ try {
+ if ( count == null ) {
+ logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. "
+ + "This could potentially lead to a resource leak", writer, indexingDirectory);
+ writer.close();
+ } else if ( count.getCount() <= 1 ) {
+ // we are finished with this writer.
+ logger.debug("Closing Index Writer for {}", indexingDirectory);
+ count.close();
+ } else {
+ // decrement the count.
+ logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1);
+ writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1));
+ }
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close Index Writer {} due to {}", writer, ioe);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+
+ public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
+ final File absoluteFile = indexDir.getAbsoluteFile();
+ logger.debug("Borrowing index searcher for {}", indexDir);
+
+ lock.lock();
+ try {
+ // check if we already have a reader cached.
+ List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
+ if ( currentlyCached == null ) {
+ currentlyCached = new ArrayList<>();
+ activeSearchers.put(absoluteFile, currentlyCached);
+ } else {
+ // keep track of any searchers that have been closed so that we can remove them
+ // from our cache later.
+ final Set<ActiveIndexSearcher> expired = new HashSet<>();
+
+ try {
+ for ( final ActiveIndexSearcher searcher : currentlyCached ) {
+ if ( searcher.isCache() ) {
+ // if the searcher is poisoned, we want to close and expire it.
+ if ( searcher.isPoisoned() ) {
+ logger.debug("Index Searcher for {} is poisoned; removing cached searcher", absoluteFile);
+ expired.add(searcher);
+ continue;
+ }
+
+ // if there are no references to the reader, it will have been closed. Since there is no
+ // isClosed() method, this is how we determine whether it's been closed or not.
+ final int refCount = searcher.getSearcher().getIndexReader().getRefCount();
+ if ( refCount <= 0 ) {
+ // if refCount == 0, then the reader has been closed, so we need to discard the searcher
+ logger.debug("Reference count for cached Index Searcher for {} is currently {}; "
+ + "removing cached searcher", absoluteFile, refCount);
+ expired.add(searcher);
+ continue;
+ }
+
+ logger.debug("Providing previously cached index searcher for {}", indexDir);
+ return searcher.getSearcher();
+ }
+ }
+ } finally {
+ // if we have any expired index searchers, we need to close them and remove them
+ // from the cache so that we don't try to use them again later.
+ for ( final ActiveIndexSearcher searcher : expired ) {
+ try {
+ searcher.close();
+ } catch (final Exception e) {
+ logger.debug("Failed to close 'expired' IndexSearcher {}", searcher);
+ }
+
+ currentlyCached.remove(searcher);
+ }
+ }
+ }
+
+ final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+ if ( writerCount == null ) {
+ final Directory directory = FSDirectory.open(absoluteFile);
+ logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir);
+
+ try {
+ final DirectoryReader directoryReader = DirectoryReader.open(directory);
+ final IndexSearcher searcher = new IndexSearcher(directoryReader);
+
+ // we want to cache the searcher that we create, since it's just a reader.
+ final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, directoryReader, directory, true);
+ currentlyCached.add(cached);
+
+ return cached.getSearcher();
+ } catch (final IOException e) {
+ try {
+ directory.close();
+ } catch (final IOException ioe) {
+ e.addSuppressed(ioe);
+ }
+
+ throw e;
+ }
+ } else {
+ logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing "
+ + "counter to {}", indexDir, writerCount.getCount() + 1);
+
+ // increment the writer count to ensure that it's kept open.
+ writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+ writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
+
+ // create a new Index Searcher from the writer so that we don't have an issue with trying
+ // to read from a directory that's locked. If we get the "no segments* file found" with
+ // Lucene, this indicates that an IndexWriter already has the directory open.
+ final IndexWriter writer = writerCount.getWriter();
+ final DirectoryReader directoryReader = DirectoryReader.open(writer, false);
+ final IndexSearcher searcher = new IndexSearcher(directoryReader);
+
+ // we don't want to cache this searcher because it's based on a writer, so we want to get
+ // new values the next time that we search.
+ final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, directoryReader, null, false);
+
+ currentlyCached.add(activeSearcher);
+ return activeSearcher.getSearcher();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+
+ public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) {
+ final File absoluteFile = indexDirectory.getAbsoluteFile();
+ logger.debug("Returning index searcher for {} to IndexManager", indexDirectory);
+
+ lock.lock();
+ try {
+ // check if we already have a reader cached.
+ final List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
+ if ( currentlyCached == null ) {
+ logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could "
+ + "result in a resource leak", indexDirectory);
+ return;
+ }
+
+ // Check if the given searcher is in our list. We use an Iterator to do this so that if we
+ // find it we can call remove() on the iterator if need be.
+ final Iterator<ActiveIndexSearcher> itr = currentlyCached.iterator();
+ while (itr.hasNext()) {
+ final ActiveIndexSearcher activeSearcher = itr.next();
+ if ( activeSearcher.getSearcher().equals(searcher) ) {
+ if ( activeSearcher.isCache() ) {
+ // if the searcher is poisoned, close it and remove from "pool".
+ if ( activeSearcher.isPoisoned() ) {
+ itr.remove();
+
+ try {
+ logger.debug("Closing Index Searcher for {} because it is poisoned", indexDirectory);
+ activeSearcher.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+
+ return;
+ } else {
+ // the searcher is cached. Just leave it open.
+ logger.debug("Index searcher for {} is cached; leaving open", indexDirectory);
+ return;
+ }
+ } else {
+ // searcher is not cached. It was created from a writer, and we want
+ // the newest updates the next time that we get a searcher, so we will
+ // go ahead and close this one out.
+ itr.remove();
+
+ // decrement the writer count because we incremented it when creating the searcher
+ final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+ if ( writerCount != null ) {
+ if ( writerCount.getCount() <= 1 ) {
+ try {
+ logger.debug("Index searcher for {} is not cached. Writer count is "
+ + "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1);
+
+ writerCount.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+ } else {
+ logger.debug("Index searcher for {} is not cached. Writer count is decremented "
+ + "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1);
+
+ writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+ writerCount.getAnalyzer(), writerCount.getDirectory(),
+ writerCount.getCount() - 1));
+ }
+ }
+
+ try {
+ logger.debug("Closing Index Searcher for {}", indexDirectory);
+ activeSearcher.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+ }
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ logger.debug("Closing Index Manager");
+
+ lock.lock();
+ try {
+ IOException ioe = null;
+
+ for ( final IndexWriterCount count : writerCounts.values() ) {
+ try {
+ count.close();
+ } catch (final IOException e) {
+ if ( ioe == null ) {
+ ioe = e;
+ } else {
+ ioe.addSuppressed(e);
+ }
+ }
+ }
+
+ for (final List<ActiveIndexSearcher> searcherList : activeSearchers.values()) {
+ for (final ActiveIndexSearcher searcher : searcherList) {
+ try {
+ searcher.close();
+ } catch (final IOException e) {
+ if ( ioe == null ) {
+ ioe = e;
+ } else {
+ ioe.addSuppressed(e);
+ }
+ }
+ }
+ }
+
+ if ( ioe != null ) {
+ throw ioe;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+
+ private static void close(final Closeable... closeables) throws IOException {
+ IOException ioe = null;
+ for ( final Closeable closeable : closeables ) {
+ if ( closeable == null ) {
+ continue;
+ }
+
+ try {
+ closeable.close();
+ } catch (final IOException e) {
+ if ( ioe == null ) {
+ ioe = e;
+ } else {
+ ioe.addSuppressed(e);
+ }
+ }
+ }
+
+ if ( ioe != null ) {
+ throw ioe;
+ }
+ }
+
+
+ private static class ActiveIndexSearcher implements Closeable {
+ private final IndexSearcher searcher;
+ private final DirectoryReader directoryReader;
+ private final Directory directory;
+ private final boolean cache;
+ private boolean poisoned = false;
+
+ public ActiveIndexSearcher(final IndexSearcher searcher, final DirectoryReader directoryReader,
+ final Directory directory, final boolean cache) {
+ this.searcher = searcher;
+ this.directoryReader = directoryReader;
+ this.directory = directory;
+ this.cache = cache;
+ }
+
+ public boolean isCache() {
+ return cache;
+ }
+
+ public IndexSearcher getSearcher() {
+ return searcher;
+ }
+
+ public boolean isPoisoned() {
+ return poisoned;
+ }
+
+ public void poison() {
+ this.poisoned = true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ IndexManager.close(directoryReader, directory);
+ }
+ }
+
+
+ private static class IndexWriterCount implements Closeable {
+ private final IndexWriter writer;
+ private final Analyzer analyzer;
+ private final Directory directory;
+ private final int count;
+
+ public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) {
+ this.writer = writer;
+ this.analyzer = analyzer;
+ this.directory = directory;
+ this.count = count;
+ }
+
+ public Analyzer getAnalyzer() {
+ return analyzer;
+ }
+
+ public Directory getDirectory() {
+ return directory;
+ }
+
+ public IndexWriter getWriter() {
+ return writer;
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ @Override
+ public void close() throws IOException {
+ IndexManager.close(writer, analyzer, directory);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
index 61f86e7..60328fa 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
@@ -1,155 +1,155 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance.toc;
-
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-
-/**
- * Standard implementation of TocReader.
- *
- * Expects .toc file to be in the following format;
- *
- * byte 0: version
- * byte 1: boolean: compressionFlag -> 0 = journal is NOT compressed, 1 = journal is compressed
- * byte 2-9: long: offset of block 0
- * byte 10-17: long: offset of block 1
- * ...
- * byte (N*8+2)-(N*8+9): long: offset of block N
- */
-public class StandardTocReader implements TocReader {
- private final boolean compressed;
- private final long[] offsets;
- private final long[] firstEventIds;
-
- public StandardTocReader(final File file) throws IOException {
- try (final FileInputStream fis = new FileInputStream(file);
- final DataInputStream dis = new DataInputStream(fis)) {
-
- final int version = dis.read();
- if ( version < 0 ) {
- throw new EOFException();
- }
-
- final int compressionFlag = dis.read();
- if ( compressionFlag < 0 ) {
- throw new EOFException();
- }
-
- if ( compressionFlag == 0 ) {
- compressed = false;
- } else if ( compressionFlag == 1 ) {
- compressed = true;
- } else {
- throw new IOException("Table of Contents appears to be corrupt: could not read 'compression flag' from header; expected value of 0 or 1 but got " + compressionFlag);
- }
-
- final int blockInfoBytes;
- switch (version) {
- case 1:
- blockInfoBytes = 8;
- break;
- case 2:
- default:
- blockInfoBytes = 16;
- break;
- }
-
- final int numBlocks = (int) ((file.length() - 2) / blockInfoBytes);
- offsets = new long[numBlocks];
-
- if ( version > 1 ) {
- firstEventIds = new long[numBlocks];
- } else {
- firstEventIds = new long[0];
- }
-
- for (int i=0; i < numBlocks; i++) {
- offsets[i] = dis.readLong();
-
- if ( version > 1 ) {
- firstEventIds[i] = dis.readLong();
- }
- }
- }
- }
-
- @Override
- public boolean isCompressed() {
- return compressed;
- }
-
- @Override
- public long getBlockOffset(final int blockIndex) {
- if ( blockIndex >= offsets.length ) {
- return -1L;
- }
- return offsets[blockIndex];
- }
-
- @Override
- public long getLastBlockOffset() {
- if ( offsets.length == 0 ) {
- return 0L;
- }
- return offsets[offsets.length - 1];
- }
-
- @Override
- public void close() throws IOException {
- }
-
- @Override
- public int getBlockIndex(final long blockOffset) {
- for (int i=0; i < offsets.length; i++) {
- if ( offsets[i] > blockOffset ) {
- // if the offset is less than the offset of our first block,
- // just return 0 to indicate the first block. Otherwise,
- // return i-1 because i represents the first block whose offset is
- // greater than 'blockOffset'.
- return (i == 0) ? 0 : i-1;
- }
- }
-
- // None of the blocks have an offset greater than the provided offset.
- // Therefore, if the event is present, it must be in the last block.
- return offsets.length - 1;
- }
-
- @Override
- public Integer getBlockIndexForEventId(final long eventId) {
- // if we don't have event ID's stored in the TOC (which happens for version 1 of the TOC),
- // or if the event ID is less than the first Event ID in this TOC, then the Event ID
- // is unknown -- return null.
- if ( firstEventIds.length == 0 || eventId < firstEventIds[0] ) {
- return null;
- }
-
- for (int i=1; i < firstEventIds.length; i++) {
- if ( firstEventIds[i] > eventId ) {
- return i-1;
- }
- }
-
- // None of the blocks start with an Event ID greater than the provided ID.
- // Therefore, if the event is present, it must be in the last block.
- return firstEventIds.length - 1;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+/**
+ * Standard implementation of TocReader.
+ *
+ * Expects .toc file to be in the following format;
+ *
+ * byte 0: version
+ * byte 1: boolean: compressionFlag -> 0 = journal is NOT compressed, 1 = journal is compressed
+ * byte 2-9: long: offset of block 0
+ * byte 10-17: long: offset of block 1
+ * ...
+ * byte (N*8+2)-(N*8+9): long: offset of block N
+ */
+public class StandardTocReader implements TocReader {
+ private final boolean compressed;
+ private final long[] offsets;
+ private final long[] firstEventIds;
+
+ public StandardTocReader(final File file) throws IOException {
+ try (final FileInputStream fis = new FileInputStream(file);
+ final DataInputStream dis = new DataInputStream(fis)) {
+
+ final int version = dis.read();
+ if ( version < 0 ) {
+ throw new EOFException();
+ }
+
+ final int compressionFlag = dis.read();
+ if ( compressionFlag < 0 ) {
+ throw new EOFException();
+ }
+
+ if ( compressionFlag == 0 ) {
+ compressed = false;
+ } else if ( compressionFlag == 1 ) {
+ compressed = true;
+ } else {
+ throw new IOException("Table of Contents appears to be corrupt: could not read 'compression flag' from header; expected value of 0 or 1 but got " + compressionFlag);
+ }
+
+ final int blockInfoBytes;
+ switch (version) {
+ case 1:
+ blockInfoBytes = 8;
+ break;
+ case 2:
+ default:
+ blockInfoBytes = 16;
+ break;
+ }
+
+ final int numBlocks = (int) ((file.length() - 2) / blockInfoBytes);
+ offsets = new long[numBlocks];
+
+ if ( version > 1 ) {
+ firstEventIds = new long[numBlocks];
+ } else {
+ firstEventIds = new long[0];
+ }
+
+ for (int i=0; i < numBlocks; i++) {
+ offsets[i] = dis.readLong();
+
+ if ( version > 1 ) {
+ firstEventIds[i] = dis.readLong();
+ }
+ }
+ }
+ }
+
+ @Override
+ public boolean isCompressed() {
+ return compressed;
+ }
+
+ @Override
+ public long getBlockOffset(final int blockIndex) {
+ if ( blockIndex >= offsets.length ) {
+ return -1L;
+ }
+ return offsets[blockIndex];
+ }
+
+ @Override
+ public long getLastBlockOffset() {
+ if ( offsets.length == 0 ) {
+ return 0L;
+ }
+ return offsets[offsets.length - 1];
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public int getBlockIndex(final long blockOffset) {
+ for (int i=0; i < offsets.length; i++) {
+ if ( offsets[i] > blockOffset ) {
+ // if the offset is less than the offset of our first block,
+ // just return 0 to indicate the first block. Otherwise,
+ // return i-1 because i represents the first block whose offset is
+ // greater than 'blockOffset'.
+ return (i == 0) ? 0 : i-1;
+ }
+ }
+
+ // None of the blocks have an offset greater than the provided offset.
+ // Therefore, if the event is present, it must be in the last block.
+ return offsets.length - 1;
+ }
+
+ @Override
+ public Integer getBlockIndexForEventId(final long eventId) {
+ // if we don't have event ID's stored in the TOC (which happens for version 1 of the TOC),
+ // or if the event ID is less than the first Event ID in this TOC, then the Event ID
+ // is unknown -- return null.
+ if ( firstEventIds.length == 0 || eventId < firstEventIds[0] ) {
+ return null;
+ }
+
+ for (int i=1; i < firstEventIds.length; i++) {
+ if ( firstEventIds[i] > eventId ) {
+ return i-1;
+ }
+ }
+
+ // None of the blocks start with an Event ID greater than the provided ID.
+ // Therefore, if the event is present, it must be in the last block.
+ return firstEventIds.length - 1;
+ }
+}