You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/04/28 16:04:55 UTC
[21/50] [abbrv] incubator-nifi git commit: NIFI-527: Code cleanup
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/ExpirationAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/ExpirationAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/ExpirationAction.java
index 8c266d1..0ffa5e6 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/ExpirationAction.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/ExpirationAction.java
@@ -25,9 +25,9 @@ public interface ExpirationAction {
* Performs some action against the given File and returns the new File that
* contains the modified version
*
- * @param expiredFile
- * @return
- * @throws IOException
+ * @param expiredFile the file that was expired
+ * @return the new file after the file has been renamed, or the expiredFile if the file was not renamed
+ * @throws IOException if there was an IO problem
*/
File execute(File expiredFile) throws IOException;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
index 7db04aa..70bf36e 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
@@ -49,9 +49,9 @@ public class DeleteIndexAction implements ExpirationAction {
long numDeleted = 0;
long maxEventId = -1L;
try (final RecordReader reader = RecordReaders.newRecordReader(expiredFile, repository.getAllLogFiles())) {
- maxEventId = reader.getMaxEventId();
+ maxEventId = reader.getMaxEventId();
} catch (final IOException ioe) {
- logger.warn("Failed to obtain max ID present in journal file {}", expiredFile.getAbsolutePath());
+ logger.warn("Failed to obtain max ID present in journal file {}", expiredFile.getAbsolutePath());
}
// remove the records from the index
@@ -68,19 +68,19 @@ public class DeleteIndexAction implements ExpirationAction {
deleteDir = (docsLeft <= 0);
logger.debug("After expiring {}, there are {} docs left for index {}", expiredFile, docsLeft, indexingDirectory);
} finally {
- indexManager.returnIndexWriter(indexingDirectory, writer);
+ indexManager.returnIndexWriter(indexingDirectory, writer);
}
// we've confirmed that all documents have been removed. Delete the index directory.
if (deleteDir) {
- indexManager.removeIndex(indexingDirectory);
+ indexManager.removeIndex(indexingDirectory);
indexConfiguration.removeIndexDirectory(indexingDirectory);
-
+
deleteDirectory(indexingDirectory);
logger.info("Removed empty index directory {}", indexingDirectory);
}
}
-
+
// Update the minimum index to 1 more than the max Event ID in this file.
if (maxEventId > -1L) {
indexConfiguration.setMinIdIndexed(maxEventId + 1L);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
index 5a77f42..98137fb 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
@@ -45,12 +45,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DocsReader {
- private final Logger logger = LoggerFactory.getLogger(DocsReader.class);
-
+ private final Logger logger = LoggerFactory.getLogger(DocsReader.class);
+
public DocsReader(final List<File> storageDirectories) {
}
- public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles, final AtomicInteger retrievalCount, final int maxResults) throws IOException {
+ public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles,
+ final AtomicInteger retrievalCount, final int maxResults) throws IOException {
if (retrievalCount.get() >= maxResults) {
return Collections.emptySet();
}
@@ -73,42 +74,42 @@ public class DocsReader {
return read(docs, allProvenanceLogFiles);
}
-
+
private long getByteOffset(final Document d, final RecordReader reader) {
final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
if ( blockField != null ) {
- final int blockIndex = blockField.numericValue().intValue();
- final TocReader tocReader = reader.getTocReader();
- return tocReader.getBlockOffset(blockIndex);
+ final int blockIndex = blockField.numericValue().intValue();
+ final TocReader tocReader = reader.getTocReader();
+ return tocReader.getBlockOffset(blockIndex);
}
-
- return d.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
+
+ return d.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
}
-
-
+
+
private ProvenanceEventRecord getRecord(final Document d, final RecordReader reader) throws IOException {
- IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
- if ( blockField == null ) {
- reader.skipTo(getByteOffset(d, reader));
- } else {
- reader.skipToBlock(blockField.numericValue().intValue());
- }
-
+ IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
+ if ( blockField == null ) {
+ reader.skipTo(getByteOffset(d, reader));
+ } else {
+ reader.skipToBlock(blockField.numericValue().intValue());
+ }
+
StandardProvenanceEventRecord record;
while ( (record = reader.nextRecord()) != null) {
- IndexableField idField = d.getField(SearchableFields.Identifier.getSearchableFieldName());
- if ( idField == null || idField.numericValue().longValue() == record.getEventId() ) {
- break;
- }
+ IndexableField idField = d.getField(SearchableFields.Identifier.getSearchableFieldName());
+ if ( idField == null || idField.numericValue().longValue() == record.getEventId() ) {
+ break;
+ }
}
-
+
if ( record == null ) {
- throw new IOException("Failed to find Provenance Event " + d);
+ throw new IOException("Failed to find Provenance Event " + d);
} else {
- return record;
+ return record;
}
}
-
+
public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles) throws IOException {
LuceneUtil.sortDocsForRetrieval(docs);
@@ -119,23 +120,23 @@ public class DocsReader {
final long start = System.nanoTime();
int logFileCount = 0;
-
+
final Set<String> storageFilesToSkip = new HashSet<>();
-
+
try {
for (final Document d : docs) {
final String storageFilename = d.getField(FieldNames.STORAGE_FILENAME).stringValue();
if ( storageFilesToSkip.contains(storageFilename) ) {
- continue;
+ continue;
}
-
+
try {
if (reader != null && storageFilename.equals(lastStorageFilename)) {
- matchingRecords.add(getRecord(d, reader));
+ matchingRecords.add(getRecord(d, reader));
} else {
- logger.debug("Opening log file {}", storageFilename);
-
- logFileCount++;
+ logger.debug("Opening log file {}", storageFilename);
+
+ logFileCount++;
if (reader != null) {
reader.close();
}
@@ -143,20 +144,20 @@ public class DocsReader {
List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
if (potentialFiles.isEmpty()) {
logger.warn("Could not find Provenance Log File with basename {} in the "
- + "Provenance Repository; assuming file has expired and continuing without it", storageFilename);
+ + "Provenance Repository; assuming file has expired and continuing without it", storageFilename);
storageFilesToSkip.add(storageFilename);
continue;
}
if (potentialFiles.size() > 1) {
- throw new FileNotFoundException("Found multiple Provenance Log Files with basename " +
- storageFilename + " in the Provenance Repository");
+ throw new FileNotFoundException("Found multiple Provenance Log Files with basename " +
+ storageFilename + " in the Provenance Repository");
}
for (final File file : potentialFiles) {
try {
- reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles);
- matchingRecords.add(getRecord(d, reader));
+ reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles);
+ matchingRecords.add(getRecord(d, reader));
} catch (final IOException e) {
throw new IOException("Failed to retrieve record " + d + " from Provenance File " + file + " due to " + e, e);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
index 3943504..9c3ec31 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
@@ -41,65 +41,65 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class IndexManager implements Closeable {
- private static final Logger logger = LoggerFactory.getLogger(IndexManager.class);
-
- private final Lock lock = new ReentrantLock();
- private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
- private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>();
-
-
- public void removeIndex(final File indexDirectory) {
- final File absoluteFile = indexDirectory.getAbsoluteFile();
- logger.info("Removing index {}", indexDirectory);
-
- lock.lock();
- try {
- final IndexWriterCount count = writerCounts.remove(absoluteFile);
- if ( count != null ) {
- try {
- count.close();
- } catch (final IOException ioe) {
- logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile);
- if ( logger.isDebugEnabled() ) {
- logger.warn("", ioe);
- }
- }
- }
-
- for ( final List<ActiveIndexSearcher> searcherList : activeSearchers.values() ) {
- for ( final ActiveIndexSearcher searcher : searcherList ) {
- try {
- searcher.close();
- } catch (final IOException ioe) {
- logger.warn("Failed to close Index Searcher {} for {} due to {}",
- searcher.getSearcher(), absoluteFile, ioe);
- if ( logger.isDebugEnabled() ) {
- logger.warn("", ioe);
- }
- }
- }
- }
- } finally {
- lock.unlock();
- }
- }
-
- public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException {
- final File absoluteFile = indexingDirectory.getAbsoluteFile();
- logger.debug("Borrowing index writer for {}", indexingDirectory);
-
- lock.lock();
- try {
- IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
- if ( writerCount == null ) {
- final List<Closeable> closeables = new ArrayList<>();
+ private static final Logger logger = LoggerFactory.getLogger(IndexManager.class);
+
+ private final Lock lock = new ReentrantLock();
+ private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
+ private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>();
+
+
+ public void removeIndex(final File indexDirectory) {
+ final File absoluteFile = indexDirectory.getAbsoluteFile();
+ logger.info("Removing index {}", indexDirectory);
+
+ lock.lock();
+ try {
+ final IndexWriterCount count = writerCounts.remove(absoluteFile);
+ if ( count != null ) {
+ try {
+ count.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+ }
+
+ for ( final List<ActiveIndexSearcher> searcherList : activeSearchers.values() ) {
+ for ( final ActiveIndexSearcher searcher : searcherList ) {
+ try {
+ searcher.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close Index Searcher {} for {} due to {}",
+ searcher.getSearcher(), absoluteFile, ioe);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException {
+ final File absoluteFile = indexingDirectory.getAbsoluteFile();
+ logger.debug("Borrowing index writer for {}", indexingDirectory);
+
+ lock.lock();
+ try {
+ IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+ if ( writerCount == null ) {
+ final List<Closeable> closeables = new ArrayList<>();
final Directory directory = FSDirectory.open(indexingDirectory);
closeables.add(directory);
-
+
try {
- final Analyzer analyzer = new StandardAnalyzer();
- closeables.add(analyzer);
-
+ final Analyzer analyzer = new StandardAnalyzer();
+ closeables.add(analyzer);
+
final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
config.setWriteLockTimeout(300000L);
@@ -107,361 +107,361 @@ public class IndexManager implements Closeable {
writerCount = new IndexWriterCount(indexWriter, analyzer, directory, 1);
logger.debug("Providing new index writer for {}", indexingDirectory);
} catch (final IOException ioe) {
- for ( final Closeable closeable : closeables ) {
- try {
- closeable.close();
- } catch (final IOException ioe2) {
- ioe.addSuppressed(ioe2);
- }
- }
-
- throw ioe;
+ for ( final Closeable closeable : closeables ) {
+ try {
+ closeable.close();
+ } catch (final IOException ioe2) {
+ ioe.addSuppressed(ioe2);
+ }
+ }
+
+ throw ioe;
}
-
+
writerCounts.put(absoluteFile, writerCount);
- } else {
- logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
- writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
- writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
- }
-
- return writerCount.getWriter();
- } finally {
- lock.unlock();
- }
- }
-
- public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) {
- final File absoluteFile = indexingDirectory.getAbsoluteFile();
- logger.debug("Returning Index Writer for {} to IndexManager", indexingDirectory);
-
- lock.lock();
- try {
- IndexWriterCount count = writerCounts.remove(absoluteFile);
-
- try {
- if ( count == null ) {
- logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. "
- + "This could potentially lead to a resource leak", writer, indexingDirectory);
- writer.close();
- } else if ( count.getCount() <= 1 ) {
- // we are finished with this writer.
- logger.debug("Closing Index Writer for {}", indexingDirectory);
- count.close();
- } else {
- // decrement the count.
- logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1);
- writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1));
- }
- } catch (final IOException ioe) {
- logger.warn("Failed to close Index Writer {} due to {}", writer, ioe);
- if ( logger.isDebugEnabled() ) {
- logger.warn("", ioe);
- }
- }
- } finally {
- lock.unlock();
- }
- }
-
-
- public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
- final File absoluteFile = indexDir.getAbsoluteFile();
- logger.debug("Borrowing index searcher for {}", indexDir);
-
- lock.lock();
- try {
- // check if we already have a reader cached.
- List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
- if ( currentlyCached == null ) {
- currentlyCached = new ArrayList<>();
- activeSearchers.put(absoluteFile, currentlyCached);
- } else {
- // keep track of any searchers that have been closed so that we can remove them
- // from our cache later.
- final Set<ActiveIndexSearcher> expired = new HashSet<>();
-
- try {
- for ( final ActiveIndexSearcher searcher : currentlyCached ) {
- if ( searcher.isCache() ) {
- final int refCount = searcher.getSearcher().getIndexReader().getRefCount();
- if ( refCount <= 0 ) {
- // if refCount == 0, then the reader has been closed, so we need to discard the searcher
- logger.debug("Reference count for cached Index Searcher for {} is currently {}; "
- + "removing cached searcher", absoluteFile, refCount);
- expired.add(searcher);
- continue;
- }
-
- logger.debug("Providing previously cached index searcher for {}", indexDir);
- return searcher.getSearcher();
- }
- }
- } finally {
- // if we have any expired index searchers, we need to close them and remove them
- // from the cache so that we don't try to use them again later.
- for ( final ActiveIndexSearcher searcher : expired ) {
- try {
- searcher.close();
- } catch (final Exception e) {
- logger.debug("Failed to close 'expired' IndexSearcher {}", searcher);
- }
-
- currentlyCached.remove(searcher);
- }
- }
- }
-
- IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
- if ( writerCount == null ) {
- final Directory directory = FSDirectory.open(absoluteFile);
- logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir);
-
- try {
- final DirectoryReader directoryReader = DirectoryReader.open(directory);
- final IndexSearcher searcher = new IndexSearcher(directoryReader);
-
- // we want to cache the searcher that we create, since it's just a reader.
- final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, directoryReader, directory, true);
- currentlyCached.add(cached);
-
- return cached.getSearcher();
- } catch (final IOException e) {
- try {
- directory.close();
- } catch (final IOException ioe) {
- e.addSuppressed(ioe);
- }
-
- throw e;
- }
- } else {
- logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing "
- + "counter to {}", indexDir, writerCount.getCount() + 1);
-
- // increment the writer count to ensure that it's kept open.
- writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
- writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
-
- // create a new Index Searcher from the writer so that we don't have an issue with trying
- // to read from a directory that's locked. If we get the "no segments* file found" with
- // Lucene, this indicates that an IndexWriter already has the directory open.
- final IndexWriter writer = writerCount.getWriter();
- final DirectoryReader directoryReader = DirectoryReader.open(writer, false);
- final IndexSearcher searcher = new IndexSearcher(directoryReader);
-
- // we don't want to cache this searcher because it's based on a writer, so we want to get
- // new values the next time that we search.
- final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, directoryReader, null, false);
-
- currentlyCached.add(activeSearcher);
- return activeSearcher.getSearcher();
- }
- } finally {
- lock.unlock();
- }
- }
-
-
- public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) {
- final File absoluteFile = indexDirectory.getAbsoluteFile();
- logger.debug("Returning index searcher for {} to IndexManager", indexDirectory);
-
- lock.lock();
- try {
- // check if we already have a reader cached.
- List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
- if ( currentlyCached == null ) {
- logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could "
- + "result in a resource leak", indexDirectory);
- return;
- }
-
- final Iterator<ActiveIndexSearcher> itr = currentlyCached.iterator();
- while (itr.hasNext()) {
- final ActiveIndexSearcher activeSearcher = itr.next();
- if ( activeSearcher.getSearcher().equals(searcher) ) {
- if ( activeSearcher.isCache() ) {
- // the searcher is cached. Just leave it open.
- logger.debug("Index searcher for {} is cached; leaving open", indexDirectory);
- return;
- } else {
- // searcher is not cached. It was created from a writer, and we want
- // the newest updates the next time that we get a searcher, so we will
- // go ahead and close this one out.
- itr.remove();
-
- // decrement the writer count because we incremented it when creating the searcher
- final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
- if ( writerCount != null ) {
- if ( writerCount.getCount() <= 1 ) {
- try {
- logger.debug("Index searcher for {} is not cached. Writer count is "
- + "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1);
-
- writerCount.close();
- } catch (final IOException ioe) {
- logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe);
- if ( logger.isDebugEnabled() ) {
- logger.warn("", ioe);
- }
- }
- } else {
- logger.debug("Index searcher for {} is not cached. Writer count is decremented "
- + "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1);
-
- writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
- writerCount.getAnalyzer(), writerCount.getDirectory(),
- writerCount.getCount() - 1));
- }
- }
-
- try {
- logger.debug("Closing Index Searcher for {}", indexDirectory);
- activeSearcher.close();
- } catch (final IOException ioe) {
- logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
- if ( logger.isDebugEnabled() ) {
- logger.warn("", ioe);
- }
- }
- }
- }
- }
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public void close() throws IOException {
- logger.debug("Closing Index Manager");
-
- lock.lock();
- try {
- IOException ioe = null;
-
- for ( final IndexWriterCount count : writerCounts.values() ) {
- try {
- count.close();
- } catch (final IOException e) {
- if ( ioe == null ) {
- ioe = e;
- } else {
- ioe.addSuppressed(e);
- }
- }
- }
-
- for (final List<ActiveIndexSearcher> searcherList : activeSearchers.values()) {
- for (final ActiveIndexSearcher searcher : searcherList) {
- try {
- searcher.close();
- } catch (final IOException e) {
- if ( ioe == null ) {
- ioe = e;
- } else {
- ioe.addSuppressed(e);
- }
- }
- }
- }
-
- if ( ioe != null ) {
- throw ioe;
- }
- } finally {
- lock.unlock();
- }
- }
-
-
- private static void close(final Closeable... closeables) throws IOException {
- IOException ioe = null;
- for ( final Closeable closeable : closeables ) {
- if ( closeable == null ) {
- continue;
- }
-
- try {
- closeable.close();
- } catch (final IOException e) {
- if ( ioe == null ) {
- ioe = e;
- } else {
- ioe.addSuppressed(e);
- }
- }
- }
-
- if ( ioe != null ) {
- throw ioe;
- }
- }
-
-
- private static class ActiveIndexSearcher implements Closeable {
- private final IndexSearcher searcher;
- private final DirectoryReader directoryReader;
- private final Directory directory;
- private final boolean cache;
-
- public ActiveIndexSearcher(IndexSearcher searcher, DirectoryReader directoryReader,
- Directory directory, final boolean cache) {
- this.searcher = searcher;
- this.directoryReader = directoryReader;
- this.directory = directory;
- this.cache = cache;
- }
-
- public boolean isCache() {
- return cache;
- }
-
- public IndexSearcher getSearcher() {
- return searcher;
- }
-
- @Override
- public void close() throws IOException {
- IndexManager.close(directoryReader, directory);
- }
- }
-
-
- private static class IndexWriterCount implements Closeable {
- private final IndexWriter writer;
- private final Analyzer analyzer;
- private final Directory directory;
- private final int count;
-
- public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) {
- this.writer = writer;
- this.analyzer = analyzer;
- this.directory = directory;
- this.count = count;
- }
-
- public Analyzer getAnalyzer() {
- return analyzer;
- }
-
- public Directory getDirectory() {
- return directory;
- }
-
- public IndexWriter getWriter() {
- return writer;
- }
-
- public int getCount() {
- return count;
- }
-
- @Override
- public void close() throws IOException {
- IndexManager.close(writer, analyzer, directory);
- }
- }
+ } else {
+ logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
+ writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+ writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
+ }
+
+ return writerCount.getWriter();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) {
+ final File absoluteFile = indexingDirectory.getAbsoluteFile();
+ logger.debug("Returning Index Writer for {} to IndexManager", indexingDirectory);
+
+ lock.lock();
+ try {
+ IndexWriterCount count = writerCounts.remove(absoluteFile);
+
+ try {
+ if ( count == null ) {
+ logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. "
+ + "This could potentially lead to a resource leak", writer, indexingDirectory);
+ writer.close();
+ } else if ( count.getCount() <= 1 ) {
+ // we are finished with this writer.
+ logger.debug("Closing Index Writer for {}", indexingDirectory);
+ count.close();
+ } else {
+ // decrement the count.
+ logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1);
+ writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1));
+ }
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close Index Writer {} due to {}", writer, ioe);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+
+ public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
+ final File absoluteFile = indexDir.getAbsoluteFile();
+ logger.debug("Borrowing index searcher for {}", indexDir);
+
+ lock.lock();
+ try {
+ // check if we already have a reader cached.
+ List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
+ if ( currentlyCached == null ) {
+ currentlyCached = new ArrayList<>();
+ activeSearchers.put(absoluteFile, currentlyCached);
+ } else {
+ // keep track of any searchers that have been closed so that we can remove them
+ // from our cache later.
+ final Set<ActiveIndexSearcher> expired = new HashSet<>();
+
+ try {
+ for ( final ActiveIndexSearcher searcher : currentlyCached ) {
+ if ( searcher.isCache() ) {
+ final int refCount = searcher.getSearcher().getIndexReader().getRefCount();
+ if ( refCount <= 0 ) {
+ // if refCount == 0, then the reader has been closed, so we need to discard the searcher
+ logger.debug("Reference count for cached Index Searcher for {} is currently {}; "
+ + "removing cached searcher", absoluteFile, refCount);
+ expired.add(searcher);
+ continue;
+ }
+
+ logger.debug("Providing previously cached index searcher for {}", indexDir);
+ return searcher.getSearcher();
+ }
+ }
+ } finally {
+ // if we have any expired index searchers, we need to close them and remove them
+ // from the cache so that we don't try to use them again later.
+ for ( final ActiveIndexSearcher searcher : expired ) {
+ try {
+ searcher.close();
+ } catch (final Exception e) {
+ logger.debug("Failed to close 'expired' IndexSearcher {}", searcher);
+ }
+
+ currentlyCached.remove(searcher);
+ }
+ }
+ }
+
+ IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+ if ( writerCount == null ) {
+ final Directory directory = FSDirectory.open(absoluteFile);
+ logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir);
+
+ try {
+ final DirectoryReader directoryReader = DirectoryReader.open(directory);
+ final IndexSearcher searcher = new IndexSearcher(directoryReader);
+
+ // we want to cache the searcher that we create, since it's just a reader.
+ final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, directoryReader, directory, true);
+ currentlyCached.add(cached);
+
+ return cached.getSearcher();
+ } catch (final IOException e) {
+ try {
+ directory.close();
+ } catch (final IOException ioe) {
+ e.addSuppressed(ioe);
+ }
+
+ throw e;
+ }
+ } else {
+ logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing "
+ + "counter to {}", indexDir, writerCount.getCount() + 1);
+
+ // increment the writer count to ensure that it's kept open.
+ writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+ writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
+
+ // create a new Index Searcher from the writer so that we don't have an issue with trying
+ // to read from a directory that's locked. If we get the "no segments* file found" with
+ // Lucene, this indicates that an IndexWriter already has the directory open.
+ final IndexWriter writer = writerCount.getWriter();
+ final DirectoryReader directoryReader = DirectoryReader.open(writer, false);
+ final IndexSearcher searcher = new IndexSearcher(directoryReader);
+
+ // we don't want to cache this searcher because it's based on a writer, so we want to get
+ // new values the next time that we search.
+ final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, directoryReader, null, false);
+
+ currentlyCached.add(activeSearcher);
+ return activeSearcher.getSearcher();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+
+ public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) {
+ final File absoluteFile = indexDirectory.getAbsoluteFile();
+ logger.debug("Returning index searcher for {} to IndexManager", indexDirectory);
+
+ lock.lock();
+ try {
+ // check if we already have a reader cached.
+ List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
+ if ( currentlyCached == null ) {
+ logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could "
+ + "result in a resource leak", indexDirectory);
+ return;
+ }
+
+ final Iterator<ActiveIndexSearcher> itr = currentlyCached.iterator();
+ while (itr.hasNext()) {
+ final ActiveIndexSearcher activeSearcher = itr.next();
+ if ( activeSearcher.getSearcher().equals(searcher) ) {
+ if ( activeSearcher.isCache() ) {
+ // the searcher is cached. Just leave it open.
+ logger.debug("Index searcher for {} is cached; leaving open", indexDirectory);
+ return;
+ } else {
+ // searcher is not cached. It was created from a writer, and we want
+ // the newest updates the next time that we get a searcher, so we will
+ // go ahead and close this one out.
+ itr.remove();
+
+ // decrement the writer count because we incremented it when creating the searcher
+ final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+ if ( writerCount != null ) {
+ if ( writerCount.getCount() <= 1 ) {
+ try {
+ logger.debug("Index searcher for {} is not cached. Writer count is "
+ + "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1);
+
+ writerCount.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+ } else {
+ logger.debug("Index searcher for {} is not cached. Writer count is decremented "
+ + "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1);
+
+ writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+ writerCount.getAnalyzer(), writerCount.getDirectory(),
+ writerCount.getCount() - 1));
+ }
+ }
+
+ try {
+ logger.debug("Closing Index Searcher for {}", indexDirectory);
+ activeSearcher.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+ }
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ logger.debug("Closing Index Manager");
+
+ lock.lock();
+ try {
+ IOException ioe = null;
+
+ for ( final IndexWriterCount count : writerCounts.values() ) {
+ try {
+ count.close();
+ } catch (final IOException e) {
+ if ( ioe == null ) {
+ ioe = e;
+ } else {
+ ioe.addSuppressed(e);
+ }
+ }
+ }
+
+ for (final List<ActiveIndexSearcher> searcherList : activeSearchers.values()) {
+ for (final ActiveIndexSearcher searcher : searcherList) {
+ try {
+ searcher.close();
+ } catch (final IOException e) {
+ if ( ioe == null ) {
+ ioe = e;
+ } else {
+ ioe.addSuppressed(e);
+ }
+ }
+ }
+ }
+
+ if ( ioe != null ) {
+ throw ioe;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+
+ private static void close(final Closeable... closeables) throws IOException {
+ IOException ioe = null;
+ for ( final Closeable closeable : closeables ) {
+ if ( closeable == null ) {
+ continue;
+ }
+
+ try {
+ closeable.close();
+ } catch (final IOException e) {
+ if ( ioe == null ) {
+ ioe = e;
+ } else {
+ ioe.addSuppressed(e);
+ }
+ }
+ }
+
+ if ( ioe != null ) {
+ throw ioe;
+ }
+ }
+
+
+ private static class ActiveIndexSearcher implements Closeable {
+ private final IndexSearcher searcher;
+ private final DirectoryReader directoryReader;
+ private final Directory directory;
+ private final boolean cache;
+
+ public ActiveIndexSearcher(IndexSearcher searcher, DirectoryReader directoryReader,
+ Directory directory, final boolean cache) {
+ this.searcher = searcher;
+ this.directoryReader = directoryReader;
+ this.directory = directory;
+ this.cache = cache;
+ }
+
+ public boolean isCache() {
+ return cache;
+ }
+
+ public IndexSearcher getSearcher() {
+ return searcher;
+ }
+
+ @Override
+ public void close() throws IOException {
+ IndexManager.close(directoryReader, directory);
+ }
+ }
+
+
+ private static class IndexWriterCount implements Closeable {
+ private final IndexWriter writer;
+ private final Analyzer analyzer;
+ private final Directory directory;
+ private final int count;
+
+ public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) {
+ this.writer = writer;
+ this.analyzer = analyzer;
+ this.directory = directory;
+ this.count = count;
+ }
+
+ public Analyzer getAnalyzer() {
+ return analyzer;
+ }
+
+ public Directory getDirectory() {
+ return directory;
+ }
+
+ public IndexWriter getWriter() {
+ return writer;
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ @Override
+ public void close() throws IOException {
+ IndexManager.close(writer, analyzer, directory);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
index dcb6e08..53869f4 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
@@ -35,7 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class IndexSearch {
- private final Logger logger = LoggerFactory.getLogger(IndexSearch.class);
+ private final Logger logger = LoggerFactory.getLogger(IndexSearch.class);
private final PersistentProvenanceRepository repository;
private final File indexDirectory;
private final IndexManager indexManager;
@@ -65,17 +65,17 @@ public class IndexSearch {
final long start = System.nanoTime();
IndexSearcher searcher = null;
try {
- searcher = indexManager.borrowIndexSearcher(indexDirectory);
+ searcher = indexManager.borrowIndexSearcher(indexDirectory);
final long searchStartNanos = System.nanoTime();
final long openSearcherNanos = searchStartNanos - start;
-
+
final TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults());
final long finishSearch = System.nanoTime();
final long searchNanos = finishSearch - searchStartNanos;
-
- logger.debug("Searching {} took {} millis; opening searcher took {} millis", this,
- TimeUnit.NANOSECONDS.toMillis(searchNanos), TimeUnit.NANOSECONDS.toMillis(openSearcherNanos));
-
+
+ logger.debug("Searching {} took {} millis; opening searcher took {} millis", this,
+ TimeUnit.NANOSECONDS.toMillis(searchNanos), TimeUnit.NANOSECONDS.toMillis(openSearcherNanos));
+
if (topDocs.totalHits == 0) {
sqr.update(Collections.<ProvenanceEventRecord>emptyList(), 0);
return sqr;
@@ -83,31 +83,31 @@ public class IndexSearch {
final DocsReader docsReader = new DocsReader(repository.getConfiguration().getStorageDirectories());
matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount, provenanceQuery.getMaxResults());
-
+
final long readRecordsNanos = System.nanoTime() - finishSearch;
logger.debug("Reading {} records took {} millis for {}", matchingRecords.size(), TimeUnit.NANOSECONDS.toMillis(readRecordsNanos), this);
-
+
sqr.update(matchingRecords, topDocs.totalHits);
return sqr;
} catch (final FileNotFoundException e) {
// nothing has been indexed yet, or the data has already aged off
- logger.warn("Attempted to search Provenance Index {} but could not find the file due to {}", indexDirectory, e);
- if ( logger.isDebugEnabled() ) {
- logger.warn("", e);
- }
-
+ logger.warn("Attempted to search Provenance Index {} but could not find the file due to {}", indexDirectory, e);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", e);
+ }
+
sqr.update(Collections.<ProvenanceEventRecord>emptyList(), 0);
return sqr;
} finally {
- if ( searcher != null ) {
- indexManager.returnIndexSearcher(indexDirectory, searcher);
- }
+ if ( searcher != null ) {
+ indexManager.returnIndexSearcher(indexDirectory, searcher);
+ }
}
}
-
+
@Override
public String toString() {
- return "IndexSearcher[" + indexDirectory + "]";
+ return "IndexSearcher[" + indexDirectory + "]";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
index 5e87913..46be391 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
@@ -16,50 +16,30 @@
*/
package org.apache.nifi.provenance.lucene;
-import java.io.EOFException;
-import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.IntField;
import org.apache.lucene.document.LongField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.FSDirectory;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.provenance.IndexConfiguration;
import org.apache.nifi.provenance.PersistentProvenanceRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.SearchableFields;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
-import org.apache.nifi.provenance.rollover.RolloverAction;
import org.apache.nifi.provenance.search.SearchableField;
-import org.apache.nifi.provenance.serialization.RecordReader;
-import org.apache.nifi.provenance.serialization.RecordReaders;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class IndexingAction implements RolloverAction {
-
- private final PersistentProvenanceRepository repository;
+public class IndexingAction {
private final Set<SearchableField> nonAttributeSearchableFields;
private final Set<SearchableField> attributeSearchableFields;
- private final IndexConfiguration indexConfiguration;
- private static final Logger logger = LoggerFactory.getLogger(IndexingAction.class);
-
- public IndexingAction(final PersistentProvenanceRepository repo, final IndexConfiguration indexConfig) {
- repository = repo;
- indexConfiguration = indexConfig;
+ public IndexingAction(final PersistentProvenanceRepository repo) {
attributeSearchableFields = Collections.unmodifiableSet(new HashSet<>(repo.getConfiguration().getSearchableAttributes()));
nonAttributeSearchableFields = Collections.unmodifiableSet(new HashSet<>(repo.getConfiguration().getSearchableFields()));
}
@@ -72,7 +52,7 @@ public class IndexingAction implements RolloverAction {
doc.add(new StringField(field.getSearchableFieldName(), value.toLowerCase(), store));
}
-
+
public void index(final StandardProvenanceEventRecord record, final IndexWriter indexWriter, final Integer blockIndex) throws IOException {
final Map<String, String> attributes = record.getAttributes();
@@ -105,14 +85,14 @@ public class IndexingAction implements RolloverAction {
doc.add(new LongField(SearchableFields.EventTime.getSearchableFieldName(), record.getEventTime(), Store.NO));
doc.add(new LongField(SearchableFields.FileSize.getSearchableFieldName(), record.getFileSize(), Store.NO));
doc.add(new StringField(FieldNames.STORAGE_FILENAME, storageFilename, Store.YES));
-
+
if ( blockIndex == null ) {
- doc.add(new LongField(FieldNames.STORAGE_FILE_OFFSET, record.getStorageByteOffset(), Store.YES));
+ doc.add(new LongField(FieldNames.STORAGE_FILE_OFFSET, record.getStorageByteOffset(), Store.YES));
} else {
- doc.add(new IntField(FieldNames.BLOCK_INDEX, blockIndex, Store.YES));
- doc.add(new LongField(SearchableFields.Identifier.getSearchableFieldName(), record.getEventId(), Store.YES));
+ doc.add(new IntField(FieldNames.BLOCK_INDEX, blockIndex, Store.YES));
+ doc.add(new LongField(SearchableFields.Identifier.getSearchableFieldName(), record.getEventId(), Store.YES));
}
-
+
for (final String lineageIdentifier : record.getLineageIdentifiers()) {
addField(doc, SearchableFields.LineageIdentifier, lineageIdentifier, Store.NO);
}
@@ -150,87 +130,4 @@ public class IndexingAction implements RolloverAction {
indexWriter.addDocument(doc);
}
}
-
- @Override
- public File execute(final File fileRolledOver) throws IOException {
- final File indexingDirectory = indexConfiguration.getWritableIndexDirectory(fileRolledOver);
- int indexCount = 0;
- long maxId = -1L;
-
- try (final Directory directory = FSDirectory.open(indexingDirectory);
- final Analyzer analyzer = new StandardAnalyzer()) {
-
- final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
- config.setWriteLockTimeout(300000L);
-
- try (final IndexWriter indexWriter = new IndexWriter(directory, config);
- final RecordReader reader = RecordReaders.newRecordReader(fileRolledOver, repository.getAllLogFiles())) {
- StandardProvenanceEventRecord record;
- while (true) {
- final Integer blockIndex;
- if ( reader.isBlockIndexAvailable() ) {
- blockIndex = reader.getBlockIndex();
- } else {
- blockIndex = null;
- }
-
- try {
- record = reader.nextRecord();
- } catch (final EOFException eof) {
- // system was restarted while writing to the log file. Nothing we can do here, so ignore this record.
- // On system restart, the FlowFiles should be back in their "original" queues, so the events will be re-created
- // when the data is re-processed
- break;
- }
-
- if (record == null) {
- break;
- }
-
- maxId = record.getEventId();
-
- index(record, indexWriter, blockIndex);
- indexCount++;
- }
-
- indexWriter.commit();
- } catch (final EOFException eof) {
- // nothing in the file. Move on.
- }
- } finally {
- if (maxId >= -1) {
- indexConfiguration.setMaxIdIndexed(maxId);
- }
- }
-
- final File newFile = new File(fileRolledOver.getParent(),
- LuceneUtil.substringBeforeLast(fileRolledOver.getName(), ".")
- + ".indexed."
- + LuceneUtil.substringAfterLast(fileRolledOver.getName(), "."));
-
- boolean renamed = false;
- for (int i = 0; i < 10 && !renamed; i++) {
- renamed = fileRolledOver.renameTo(newFile);
- if (!renamed) {
- try {
- Thread.sleep(25L);
- } catch (final InterruptedException e) {
- }
- }
- }
-
- if (renamed) {
- logger.info("Finished indexing Provenance Log File {} to index {} with {} records indexed and renamed file to {}",
- fileRolledOver, indexingDirectory, indexCount, newFile);
- return newFile;
- } else {
- logger.warn("Finished indexing Provenance Log File {} with {} records indexed but failed to rename file to {}; indexed {} records", new Object[]{fileRolledOver, indexCount, newFile, indexCount});
- return fileRolledOver;
- }
- }
-
- @Override
- public boolean hasBeenPerformed(final File fileRolledOver) {
- return fileRolledOver.getName().contains(".indexed.");
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
index 54cde15..3f75c00 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
@@ -48,7 +48,8 @@ public class LineageQuery {
public static final int MAX_LINEAGE_UUIDS = 100;
private static final Logger logger = LoggerFactory.getLogger(LineageQuery.class);
- public static Set<ProvenanceEventRecord> computeLineageForFlowFiles(final PersistentProvenanceRepository repo, final File indexDirectory, final String lineageIdentifier, final Collection<String> flowFileUuids) throws IOException {
+ public static Set<ProvenanceEventRecord> computeLineageForFlowFiles(final PersistentProvenanceRepository repo, final File indexDirectory,
+ final String lineageIdentifier, final Collection<String> flowFileUuids) throws IOException {
if (requireNonNull(flowFileUuids).size() > MAX_LINEAGE_UUIDS) {
throw new IllegalArgumentException(String.format("Cannot compute lineage for more than %s FlowFiles. This lineage contains %s.", MAX_LINEAGE_UUIDS, flowFileUuids.size()));
}
@@ -99,7 +100,8 @@ public class LineageQuery {
final DocsReader docsReader = new DocsReader(repo.getConfiguration().getStorageDirectories());
final Set<ProvenanceEventRecord> recs = docsReader.read(uuidQueryTopDocs, indexReader, repo.getAllLogFiles(), new AtomicInteger(0), Integer.MAX_VALUE);
final long readDocsEnd = System.nanoTime();
- logger.debug("Finished Lineage Query; Lucene search took {} millis, reading records took {} millis", TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd));
+ logger.debug("Finished Lineage Query; Lucene search took {} millis, reading records took {} millis",
+ TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd));
return recs;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
index 59dc10b..c622ea1 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
@@ -78,16 +78,16 @@ public class LuceneUtil {
final String searchString = baseName + ".";
for (final Path path : allProvenanceLogs) {
if (path.toFile().getName().startsWith(searchString)) {
- final File file = path.toFile();
- if ( file.exists() ) {
- matchingFiles.add(file);
- } else {
- final File dir = file.getParentFile();
- final File gzFile = new File(dir, file.getName() + ".gz");
- if ( gzFile.exists() ) {
- matchingFiles.add(gzFile);
- }
- }
+ final File file = path.toFile();
+ if ( file.exists() ) {
+ matchingFiles.add(file);
+ } else {
+ final File dir = file.getParentFile();
+ final File gzFile = new File(dir, file.getName() + ".gz");
+ if ( gzFile.exists() ) {
+ matchingFiles.add(gzFile);
+ }
+ }
}
}
@@ -144,16 +144,16 @@ public class LuceneUtil {
final IndexableField fileOffset1 = o1.getField(FieldNames.BLOCK_INDEX);
final IndexableField fileOffset2 = o1.getField(FieldNames.BLOCK_INDEX);
if ( fileOffset1 != null && fileOffset2 != null ) {
- final int blockIndexResult = Long.compare(fileOffset1.numericValue().longValue(), fileOffset2.numericValue().longValue());
- if ( blockIndexResult != 0 ) {
- return blockIndexResult;
- }
-
- final long eventId1 = o1.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
- final long eventId2 = o2.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
- return Long.compare(eventId1, eventId2);
+ final int blockIndexResult = Long.compare(fileOffset1.numericValue().longValue(), fileOffset2.numericValue().longValue());
+ if ( blockIndexResult != 0 ) {
+ return blockIndexResult;
+ }
+
+ final long eventId1 = o1.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
+ final long eventId2 = o2.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
+ return Long.compare(eventId1, eventId2);
}
-
+
final long offset1 = o1.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
final long offset2 = o2.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
return Long.compare(offset1, offset2);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/CompressionAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/CompressionAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/CompressionAction.java
deleted file mode 100644
index d014618..0000000
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/CompressionAction.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance.rollover;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.nifi.stream.io.GZIPOutputStream;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.provenance.lucene.IndexingAction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class CompressionAction implements RolloverAction {
-
- private static final Logger logger = LoggerFactory.getLogger(IndexingAction.class);
-
- @Override
- public File execute(final File fileRolledOver) throws IOException {
- final File gzFile = new File(fileRolledOver.getParent(), fileRolledOver.getName() + ".gz");
- try (final FileInputStream in = new FileInputStream(fileRolledOver);
- final OutputStream fos = new FileOutputStream(gzFile);
- final GZIPOutputStream gzipOut = new GZIPOutputStream(fos, 1)) {
- StreamUtils.copy(in, gzipOut);
- in.getFD().sync();
- }
-
- boolean deleted = false;
- for (int i = 0; i < 10 && !deleted; i++) {
- deleted = fileRolledOver.delete();
- }
-
- logger.info("Finished compressing Provenance Log File {}", fileRolledOver);
- return gzFile;
- }
-
- @Override
- public boolean hasBeenPerformed(final File fileRolledOver) {
- return fileRolledOver.getName().contains(".gz");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/RolloverAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/RolloverAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/RolloverAction.java
deleted file mode 100644
index 33401e9..0000000
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/RolloverAction.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance.rollover;
-
-import java.io.File;
-import java.io.IOException;
-
-public interface RolloverAction {
-
- /**
- * Performs some action against the given File and returns the new File that
- * contains the modified version
- *
- * @param fileRolledOver
- * @return
- * @throws IOException
- */
- File execute(File fileRolledOver) throws IOException;
-
- boolean hasBeenPerformed(File fileRolledOver);
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
index 8bdc88a..91c8222 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
@@ -24,75 +24,80 @@ import org.apache.nifi.provenance.toc.TocReader;
public interface RecordReader extends Closeable {
- /**
- * Returns the next record in the reader, or <code>null</code> if there is no more data available.
- * @return
- * @throws IOException
- */
+ /**
+ * Returns the next record in the reader, or <code>null</code> if there is no more data available.
+ * @return the next Provenance event in the stream
+ * @throws IOException if unable to read the next event from the stream
+ */
StandardProvenanceEventRecord nextRecord() throws IOException;
/**
* Skips the specified number of bytes
- * @param bytesToSkip
- * @throws IOException
+ * @param bytesToSkip the number of bytes to skip ahead
+ * @throws IOException if unable to skip ahead the specified number of bytes (e.g., the stream does
+ * not contain this many more bytes)
*/
void skip(long bytesToSkip) throws IOException;
/**
* Skips to the specified byte offset in the underlying stream.
- * @param position
+ * @param position the byte offset to skip to
* @throws IOException if the underlying stream throws IOException, or if the reader has already
* passed the specified byte offset
*/
void skipTo(long position) throws IOException;
-
+
/**
* Skips to the specified compression block
- *
- * @param blockIndex
+ *
+ * @param blockIndex the byte index to skip to
* @throws IOException if the underlying stream throws IOException, or if the reader has already
* read passed the specified compression block index
* @throws IllegalStateException if the RecordReader does not have a TableOfContents associated with it
*/
void skipToBlock(int blockIndex) throws IOException;
-
+
/**
* Returns the block index that the Reader is currently reading from.
* Note that the block index is incremented at the beginning of the {@link #nextRecord()}
- * method. This means that this method will return the block from which the previous record was read,
+ * method. This means that this method will return the block from which the previous record was read,
* if calling {@link #nextRecord()} continually, not the block from which the next record will be read.
- * @return
+ *
+ * @return the current block index
+ * @throws IllegalStateException if the reader is reading a provenance event file that does not contain
+ * a Table of Contents
*/
int getBlockIndex();
-
+
/**
* Returns <code>true</code> if the compression block index is available. It will be available
* if and only if the reader is created with a TableOfContents
- *
- * @return
+ *
+ * @return true if the reader is reading from an event file that has a Table of Contents
*/
boolean isBlockIndexAvailable();
-
+
/**
* Returns the {@link TocReader} that is used to keep track of compression blocks, if one exists,
* <code>null</code> otherwise
- * @return
+ *
+ * @return the TocReader if the underlying event file has an Table of Contents, <code>null</code> otherwise.
*/
TocReader getTocReader();
-
+
/**
- * Returns the number of bytes that have been consumed from the stream (read or skipped).
- * @return
+ * @return the number of bytes that have been consumed from the stream (read or skipped).
*/
long getBytesConsumed();
-
+
/**
* Returns the ID of the last event in this record reader, or -1 if the reader has no records or
* has already read through all records. Note: This method will consume the stream until the end,
* so no more records will be available on this reader after calling this method.
- *
- * @return
- * @throws IOException
+ *
+ * @return the ID of the last event in this record reader, or -1 if the reader has no records or
+ * has already read through all records
+ * @throws IOException if unable to get id of the last event
*/
long getMaxEventId() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
index dff281c..cab5e6f 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
@@ -37,75 +37,75 @@ public class RecordReaders {
InputStream fis = null;
try {
- if (!file.exists()) {
- if (provenanceLogFiles != null) {
- final String baseName = LuceneUtil.substringBefore(file.getName(), ".") + ".";
- for (final Path path : provenanceLogFiles) {
- if (path.toFile().getName().startsWith(baseName)) {
- file = path.toFile();
- break;
- }
- }
- }
- }
-
- if ( file.exists() ) {
- try {
- fis = new FileInputStream(file);
- } catch (final FileNotFoundException fnfe) {
- fis = null;
- }
- }
-
- String filename = file.getName();
- openStream: while ( fis == null ) {
- final File dir = file.getParentFile();
- final String baseName = LuceneUtil.substringBefore(file.getName(), ".");
-
- // depending on which rollover actions have occurred, we could have 3 possibilities for the
- // filename that we need. The majority of the time, we will use the extension ".prov.indexed.gz"
- // because most often we are compressing on rollover and most often we have already finished
- // compressing by the time that we are querying the data.
- for ( final String extension : new String[] {".prov.gz", ".prov"} ) {
- file = new File(dir, baseName + extension);
- if ( file.exists() ) {
- try {
- fis = new FileInputStream(file);
- filename = baseName + extension;
- break openStream;
- } catch (final FileNotFoundException fnfe) {
- // file was modified by a RolloverAction after we verified that it exists but before we could
- // create an InputStream for it. Start over.
- fis = null;
- continue openStream;
- }
- }
- }
-
- break;
- }
-
- if ( fis == null ) {
- throw new FileNotFoundException("Unable to locate file " + originalFile);
- }
-
- final File tocFile = TocUtil.getTocFile(file);
- if ( tocFile.exists() ) {
- final TocReader tocReader = new StandardTocReader(tocFile);
- return new StandardRecordReader(fis, filename, tocReader);
- } else {
- return new StandardRecordReader(fis, filename);
- }
+ if (!file.exists()) {
+ if (provenanceLogFiles != null) {
+ final String baseName = LuceneUtil.substringBefore(file.getName(), ".") + ".";
+ for (final Path path : provenanceLogFiles) {
+ if (path.toFile().getName().startsWith(baseName)) {
+ file = path.toFile();
+ break;
+ }
+ }
+ }
+ }
+
+ if ( file.exists() ) {
+ try {
+ fis = new FileInputStream(file);
+ } catch (final FileNotFoundException fnfe) {
+ fis = null;
+ }
+ }
+
+ String filename = file.getName();
+ openStream: while ( fis == null ) {
+ final File dir = file.getParentFile();
+ final String baseName = LuceneUtil.substringBefore(file.getName(), ".");
+
+ // depending on which rollover actions have occurred, we could have 3 possibilities for the
+ // filename that we need. The majority of the time, we will use the extension ".prov.indexed.gz"
+ // because most often we are compressing on rollover and most often we have already finished
+ // compressing by the time that we are querying the data.
+ for ( final String extension : new String[] {".prov.gz", ".prov"} ) {
+ file = new File(dir, baseName + extension);
+ if ( file.exists() ) {
+ try {
+ fis = new FileInputStream(file);
+ filename = baseName + extension;
+ break openStream;
+ } catch (final FileNotFoundException fnfe) {
+ // file was modified by a RolloverAction after we verified that it exists but before we could
+ // create an InputStream for it. Start over.
+ fis = null;
+ continue openStream;
+ }
+ }
+ }
+
+ break;
+ }
+
+ if ( fis == null ) {
+ throw new FileNotFoundException("Unable to locate file " + originalFile);
+ }
+
+ final File tocFile = TocUtil.getTocFile(file);
+ if ( tocFile.exists() ) {
+ final TocReader tocReader = new StandardTocReader(tocFile);
+ return new StandardRecordReader(fis, filename, tocReader);
+ } else {
+ return new StandardRecordReader(fis, filename);
+ }
} catch (final IOException ioe) {
- if ( fis != null ) {
- try {
- fis.close();
- } catch (final IOException inner) {
- ioe.addSuppressed(inner);
- }
- }
-
- throw ioe;
+ if ( fis != null ) {
+ try {
+ fis.close();
+ } catch (final IOException inner) {
+ ioe.addSuppressed(inner);
+ }
+ }
+
+ throw ioe;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
index 58f4dc2..d89fd6f 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
@@ -28,31 +28,27 @@ public interface RecordWriter extends Closeable {
/**
* Writes header information to the underlying stream
*
- * @throws IOException
+ * @throws IOException if unable to write header information to the underlying stream
*/
void writeHeader() throws IOException;
/**
* Writes the given record out to the underlying stream
*
- * @param record
- * @param recordIdentifier
+ * @param record the record to write
+ * @param recordIdentifier the new identifier of the record
* @return the number of bytes written for the given records
- * @throws IOException
+ * @throws IOException if unable to write the record to the stream
*/
long writeRecord(ProvenanceEventRecord record, long recordIdentifier) throws IOException;
/**
- * Returns the number of Records that have been written to this RecordWriter
- *
- * @return
+ * @return the number of Records that have been written to this RecordWriter
*/
int getRecordsWritten();
/**
- * Returns the file that this RecordWriter is writing to
- *
- * @return
+ * @return the file that this RecordWriter is writing to
*/
File getFile();
@@ -73,19 +69,18 @@ public interface RecordWriter extends Closeable {
* not immediately available, returns <code>false</code>; otherwise, obtains
* the lock and returns <code>true</code>.
*
- * @return
+ * @return <code>true</code> if the lock was obtained, <code>false</code> otherwise.
*/
boolean tryLock();
/**
* Syncs the content written to this writer to disk.
- * @throws java.io.IOException
+ * @throws IOException if unable to sync content to disk
*/
void sync() throws IOException;
/**
- * Returns the TOC Writer that is being used to write the Table of Contents for this journal
- * @return
+ * @return the TOC Writer that is being used to write the Table of Contents for this journal
*/
TocWriter getTocWriter();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
index 47b7c7e..cf8f7b4 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
@@ -25,14 +25,14 @@ import org.apache.nifi.provenance.toc.TocUtil;
import org.apache.nifi.provenance.toc.TocWriter;
public class RecordWriters {
- private static final int DEFAULT_COMPRESSION_BLOCK_SIZE = 1024 * 1024; // 1 MB
+ private static final int DEFAULT_COMPRESSION_BLOCK_SIZE = 1024 * 1024; // 1 MB
public static RecordWriter newRecordWriter(final File file, final boolean compressed, final boolean createToc) throws IOException {
- return newRecordWriter(file, compressed, createToc, DEFAULT_COMPRESSION_BLOCK_SIZE);
+ return newRecordWriter(file, compressed, createToc, DEFAULT_COMPRESSION_BLOCK_SIZE);
}
-
+
public static RecordWriter newRecordWriter(final File file, final boolean compressed, final boolean createToc, final int compressionBlockBytes) throws IOException {
- final TocWriter tocWriter = createToc ? new StandardTocWriter(TocUtil.getTocFile(file), false, false) : null;
+ final TocWriter tocWriter = createToc ? new StandardTocWriter(TocUtil.getTocFile(file), false, false) : null;
return new StandardRecordWriter(file, tocWriter, compressed, compressionBlockBytes);
}