You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/11/17 15:52:03 UTC
[1/3] nifi git commit: NIFI-748 Fixed logic around handling partial
query results from provenance repository - Ensured that failures derived form
correlating Document to its actual provenance event do fail the entire query
and produce partial results wit
Repository: nifi
Updated Branches:
refs/heads/master d16392c61 -> 453b140d6
NIFI-748 Fixed logic around handling partial query results from provenance repository
- Ensured that failures derived form correlating Document to its actual provenance event do fail the entire query and produce partial results with warning messages
- Refactored DocsReader.read() operation.
- Added test to validate two conditions where the such failures could occur
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a4d93c62
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a4d93c62
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a4d93c62
Branch: refs/heads/master
Commit: a4d93c62c88f594ef0cd3739a0536769f1ab9b26
Parents: 36d00a6
Author: Oleg Zhurakousky <ol...@suitcase.io>
Authored: Fri Nov 13 14:08:39 2015 -0500
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Fri Nov 13 14:23:31 2015 -0500
----------------------------------------------------------------------
.../PersistentProvenanceRepository.java | 1 +
.../nifi/provenance/lucene/DocsReader.java | 116 ++++++-------------
.../nifi/provenance/lucene/IndexSearch.java | 2 +-
.../nifi/provenance/lucene/LineageQuery.java | 2 +-
.../nifi/provenance/lucene/LuceneUtil.java | 23 ++++
.../TestPersistentProvenanceRepository.java | 70 +++++++++++
6 files changed, 132 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/a4d93c62/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 89e1419..1740f51 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -1784,6 +1784,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
try {
Thread.sleep(100L);
} catch (final InterruptedException ie) {
+ Thread.currentThread().interrupt();
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a4d93c62/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
index c2a7609..7bd800b 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
@@ -17,15 +17,14 @@
package org.apache.nifi.provenance.lucene;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -47,9 +46,6 @@ import org.slf4j.LoggerFactory;
public class DocsReader {
private final Logger logger = LoggerFactory.getLogger(DocsReader.class);
- public DocsReader(final List<File> storageDirectories) {
- }
-
public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles,
final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException {
if (retrievalCount.get() >= maxResults) {
@@ -100,101 +96,61 @@ public class DocsReader {
}
}
- if ( record == null ) {
- throw new IOException("Failed to find Provenance Event " + d);
- } else {
- return record;
+ if (record == null) {
+ logger.warn("Failed to read Provenance Event for '" + d + "'. The event file may be missing or corrupted");
}
- }
+ return record;
+ }
public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles,
- final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException {
- if (retrievalCount.get() >= maxResults) {
- return Collections.emptySet();
- }
-
- LuceneUtil.sortDocsForRetrieval(docs);
-
- RecordReader reader = null;
- String lastStorageFilename = null;
- final Set<ProvenanceEventRecord> matchingRecords = new LinkedHashSet<>();
+ final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException {
final long start = System.nanoTime();
- int logFileCount = 0;
-
- final Set<String> storageFilesToSkip = new HashSet<>();
- int eventsReadThisFile = 0;
- try {
- for (final Document d : docs) {
- final String storageFilename = d.getField(FieldNames.STORAGE_FILENAME).stringValue();
- if ( storageFilesToSkip.contains(storageFilename) ) {
- continue;
- }
-
- try {
- if (reader != null && storageFilename.equals(lastStorageFilename)) {
- matchingRecords.add(getRecord(d, reader));
- eventsReadThisFile++;
-
- if ( retrievalCount.incrementAndGet() >= maxResults ) {
- break;
- }
- } else {
- logger.debug("Opening log file {}", storageFilename);
-
- logFileCount++;
- if (reader != null) {
- reader.close();
- }
+ Set<ProvenanceEventRecord> matchingRecords = new LinkedHashSet<>();
+ if (retrievalCount.get() >= maxResults) {
+ return matchingRecords;
+ }
- final List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
- if (potentialFiles.isEmpty()) {
- logger.warn("Could not find Provenance Log File with basename {} in the "
- + "Provenance Repository; assuming file has expired and continuing without it", storageFilename);
- storageFilesToSkip.add(storageFilename);
- continue;
- }
+ Map<String, List<Document>> byStorageNameDocGroups = LuceneUtil.groupDocsByStorageFileName(docs);
- if (potentialFiles.size() > 1) {
- throw new FileNotFoundException("Found multiple Provenance Log Files with basename " +
- storageFilename + " in the Provenance Repository");
- }
+ int eventsReadThisFile = 0;
+ int logFileCount = 0;
- for (final File file : potentialFiles) {
- try {
- if (reader != null) {
- logger.debug("Read {} records from previous file", eventsReadThisFile);
- }
-
- reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles, maxAttributeChars);
- matchingRecords.add(getRecord(d, reader));
- eventsReadThisFile = 1;
-
- if ( retrievalCount.incrementAndGet() >= maxResults ) {
- break;
- }
- } catch (final IOException e) {
- throw new IOException("Failed to retrieve record " + d + " from Provenance File " + file + " due to " + e, e);
+ for (String storageFileName : byStorageNameDocGroups.keySet()) {
+ File provenanceEventFile = LuceneUtil.getProvenanceLogFile(storageFileName, allProvenanceLogFiles);
+ if (provenanceEventFile != null) {
+ try (RecordReader reader = RecordReaders.newRecordReader(provenanceEventFile, allProvenanceLogFiles,
+ maxAttributeChars)) {
+ for (Document document : byStorageNameDocGroups.get(storageFileName)) {
+ ProvenanceEventRecord eRec = this.getRecord(document, reader);
+ if (eRec != null) {
+ matchingRecords.add(eRec);
+ eventsReadThisFile++;
+
+ if (retrievalCount.incrementAndGet() >= maxResults) {
+ break;
}
}
}
- } finally {
- lastStorageFilename = storageFilename;
+ } catch (Exception e) {
+ logger.warn("Failed while trying to read Provenance Events. The event file '"
+ + provenanceEventFile.getAbsolutePath() +
+ "' may be missing or corrupted.", e);
}
- }
- } finally {
- if (reader != null) {
- reader.close();
+ } else {
+ logger.warn("Could not find Provenance Log File with "
+ + "basename {} in the Provenance Repository; assuming "
+ + "file has expired and continuing without it", storageFileName);
}
}
logger.debug("Read {} records from previous file", eventsReadThisFile);
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- logger.debug("Took {} ms to read {} events from {} prov log files", millis, matchingRecords.size(), logFileCount);
+ logger.debug("Took {} ms to read {} events from {} prov log files", millis, matchingRecords.size(),
+ logFileCount);
return matchingRecords;
}
-
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a4d93c62/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
index 7fcd8ab..b8661df 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
@@ -89,7 +89,7 @@ public class IndexSearch {
return sqr;
}
- final DocsReader docsReader = new DocsReader(repository.getConfiguration().getStorageDirectories());
+ final DocsReader docsReader = new DocsReader();
matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount,
provenanceQuery.getMaxResults(), maxAttributeChars);
http://git-wip-us.apache.org/repos/asf/nifi/blob/a4d93c62/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
index e9e6e63..ce60e03 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
@@ -93,7 +93,7 @@ public class LineageQuery {
final TopDocs uuidQueryTopDocs = searcher.search(query, MAX_QUERY_RESULTS);
final long searchEnd = System.nanoTime();
- final DocsReader docsReader = new DocsReader(repo.getConfiguration().getStorageDirectories());
+ final DocsReader docsReader = new DocsReader();
final Set<ProvenanceEventRecord> recs = docsReader.read(uuidQueryTopDocs, searcher.getIndexReader(), repo.getAllLogFiles(),
new AtomicInteger(0), Integer.MAX_VALUE, maxAttributeChars);
http://git-wip-us.apache.org/repos/asf/nifi/blob/a4d93c62/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
index c622ea1..aa50b94 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
@@ -22,7 +22,9 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.provenance.SearchableFields;
@@ -160,4 +162,25 @@ public class LuceneUtil {
}
});
}
+
+ /**
+ * Will group documents based on the {@link FieldNames#STORAGE_FILENAME}.
+ *
+ * @param documents
+ * list of {@link Document}s
+ * @return a {@link Map} of document groups with
+ * {@link FieldNames#STORAGE_FILENAME} as key and {@link List} of
+ * {@link Document}s as value.
+ */
+ public static Map<String, List<Document>> groupDocsByStorageFileName(final List<Document> documents) {
+ Map<String, List<Document>> documentGroups = new HashMap<>();
+ for (Document document : documents) {
+ String fileName = document.get(FieldNames.STORAGE_FILENAME);
+ if (!documentGroups.containsKey(fileName)) {
+ documentGroups.put(fileName, new ArrayList<Document>());
+ }
+ documentGroups.get(fileName).add(document);
+ }
+ return documentGroups;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a4d93c62/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index 5e4aed0..02b9216 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -20,9 +20,11 @@ import static org.apache.nifi.provenance.TestUtil.createFlowFile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
import java.io.File;
import java.io.FileFilter;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -36,6 +38,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.zip.GZIPOutputStream;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.core.SimpleAnalyzer;
@@ -48,6 +51,7 @@ import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.FSDirectory;
import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.provenance.lineage.EventNode;
import org.apache.nifi.provenance.lineage.Lineage;
import org.apache.nifi.provenance.lineage.LineageEdge;
@@ -869,6 +873,72 @@ public class TestPersistentProvenanceRepository {
}
}
+ /**
+ * Here the event file is simply corrupted by virtue of not having any event
+ * records while having correct headers
+ */
+ @Test
+ public void testWithWithEventFileMissingRecord() throws Exception {
+ File eventFile = this.prepCorruptedEventFileTests();
+
+ final Query query = new Query(UUID.randomUUID().toString());
+ query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "foo-*"));
+ query.setMaxResults(100);
+
+ DataOutputStream in = new DataOutputStream(new GZIPOutputStream(new FileOutputStream(eventFile)));
+ in.writeUTF("BlahBlah");
+ in.writeInt(4);
+ in.close();
+ assertTrue(eventFile.exists());
+ final QueryResult result = repo.queryEvents(query);
+ assertEquals(10, result.getMatchingEvents().size());
+ }
+
+ /**
+ * Here the event file is simply corrupted by virtue of being empty (0
+ * bytes)
+ */
+ @Test
+ public void testWithWithEventFileCorrupted() throws Exception {
+ File eventFile = this.prepCorruptedEventFileTests();
+
+ final Query query = new Query(UUID.randomUUID().toString());
+ query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "foo-*"));
+ query.setMaxResults(100);
+ DataOutputStream in = new DataOutputStream(new GZIPOutputStream(new FileOutputStream(eventFile)));
+ in.close();
+ final QueryResult result = repo.queryEvents(query);
+ assertEquals(10, result.getMatchingEvents().size());
+ }
+
+ private File prepCorruptedEventFileTests() throws Exception {
+ RepositoryConfiguration config = createConfiguration();
+ config.setMaxStorageCapacity(1024L * 1024L);
+ config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
+ config.setMaxEventFileCapacity(1024L * 1024L);
+ config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
+ config.setDesiredIndexSize(10);
+
+ repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
+ repo.initialize(getEventReporter());
+
+ String uuid = UUID.randomUUID().toString();
+ for (int i = 0; i < 20; i++) {
+ ProvenanceEventRecord record = repo.eventBuilder().fromFlowFile(mock(FlowFile.class))
+ .setEventType(ProvenanceEventType.CREATE).setComponentId("foo-" + i).setComponentType("myComponent")
+ .setFlowFileUUID(uuid).build();
+ repo.registerEvent(record);
+ if (i == 9) {
+ repo.waitForRollover();
+ Thread.sleep(2000L);
+ }
+ }
+ repo.waitForRollover();
+ File eventFile = new File(config.getStorageDirectories().get(0), "10.prov.gz");
+ assertTrue(eventFile.delete());
+ return eventFile;
+ }
+
@Test
public void testIndexDirectoryRemoved() throws InterruptedException, IOException, ParseException {
final RepositoryConfiguration config = createConfiguration();
[3/3] nifi git commit: Merge branch 'NIFI-748' of
https://github.com/olegz/nifi into NIFI-748
Posted by ma...@apache.org.
Merge branch 'NIFI-748' of https://github.com/olegz/nifi into NIFI-748
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/453b140d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/453b140d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/453b140d
Branch: refs/heads/master
Commit: 453b140d6bbd73bc63fa3e8cb9e7072eb7709e50
Parents: d16392c 15880f9
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Nov 17 09:23:10 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Nov 17 09:23:10 2015 -0500
----------------------------------------------------------------------
.../PersistentProvenanceRepository.java | 1 +
.../nifi/provenance/lucene/DocsReader.java | 115 ++++++-------------
.../nifi/provenance/lucene/IndexSearch.java | 2 +-
.../nifi/provenance/lucene/LineageQuery.java | 2 +-
.../nifi/provenance/lucene/LuceneUtil.java | 36 +++++-
.../TestPersistentProvenanceRepository.java | 70 +++++++++++
6 files changed, 144 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
[2/3] nifi git commit: NIFI-748 addressed PR comments - made
DocReader package private - polished logic in read(..) method to avoid
escaping the loop - added call to sorting logic in
LuceneUtil.groupDocsByStorageFileName(..) to ensure that previous behav
Posted by ma...@apache.org.
NIFI-748 addressed PR comments
- made DocReader package private
- polished logic in read(..) method to avoid escaping the loop
- added call to sorting logic in LuceneUtil.groupDocsByStorageFileName(..) to ensure that previous behavior and assumptions in read(..) methodd are preserved
- other minor polishing
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/15880f9f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/15880f9f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/15880f9f
Branch: refs/heads/master
Commit: 15880f9fcc78e7a040e0d64d2a517390122e3fe3
Parents: a4d93c6
Author: Oleg Zhurakousky <ol...@suitcase.io>
Authored: Mon Nov 16 08:39:23 2015 -0500
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Mon Nov 16 08:39:23 2015 -0500
----------------------------------------------------------------------
.../nifi/provenance/lucene/DocsReader.java | 21 ++++++++++----------
.../nifi/provenance/lucene/LuceneUtil.java | 15 ++++++++++++--
2 files changed, 24 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/15880f9f/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
index 7bd800b..703a5b8 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
@@ -22,6 +22,7 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -43,7 +44,7 @@ import org.apache.lucene.search.TopDocs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DocsReader {
+class DocsReader {
private final Logger logger = LoggerFactory.getLogger(DocsReader.class);
public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles,
@@ -106,12 +107,13 @@ public class DocsReader {
public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles,
final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException {
+ if (retrievalCount.get() >= maxResults) {
+ return Collections.emptySet();
+ }
+
final long start = System.nanoTime();
Set<ProvenanceEventRecord> matchingRecords = new LinkedHashSet<>();
- if (retrievalCount.get() >= maxResults) {
- return matchingRecords;
- }
Map<String, List<Document>> byStorageNameDocGroups = LuceneUtil.groupDocsByStorageFileName(docs);
@@ -123,17 +125,16 @@ public class DocsReader {
if (provenanceEventFile != null) {
try (RecordReader reader = RecordReaders.newRecordReader(provenanceEventFile, allProvenanceLogFiles,
maxAttributeChars)) {
- for (Document document : byStorageNameDocGroups.get(storageFileName)) {
- ProvenanceEventRecord eRec = this.getRecord(document, reader);
+
+ Iterator<Document> docIter = byStorageNameDocGroups.get(storageFileName).iterator();
+ while (docIter.hasNext() && retrievalCount.incrementAndGet() < maxResults){
+ ProvenanceEventRecord eRec = this.getRecord(docIter.next(), reader);
if (eRec != null) {
matchingRecords.add(eRec);
eventsReadThisFile++;
-
- if (retrievalCount.incrementAndGet() >= maxResults) {
- break;
- }
}
}
+
} catch (Exception e) {
logger.warn("Failed while trying to read Provenance Events. The event file '"
+ provenanceEventFile.getAbsolutePath() +
http://git-wip-us.apache.org/repos/asf/nifi/blob/15880f9f/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
index aa50b94..08a99d6 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
@@ -130,8 +130,14 @@ public class LuceneUtil {
return luceneQuery;
}
+ /**
+ * Will sort documents by filename and then file offset so that we can
+ * retrieve the records efficiently
+ *
+ * @param documents
+ * list of {@link Document}s
+ */
public static void sortDocsForRetrieval(final List<Document> documents) {
- // sort by filename and then file offset so that we can retrieve the records efficiently
Collections.sort(documents, new Comparator<Document>() {
@Override
public int compare(final Document o1, final Document o2) {
@@ -167,7 +173,9 @@ public class LuceneUtil {
* Will group documents based on the {@link FieldNames#STORAGE_FILENAME}.
*
* @param documents
- * list of {@link Document}s
+ * list of {@link Document}s which will be sorted via
+ * {@link #sortDocsForRetrieval(List)} for more efficient record
+ * retrieval.
* @return a {@link Map} of document groups with
* {@link FieldNames#STORAGE_FILENAME} as key and {@link List} of
* {@link Document}s as value.
@@ -181,6 +189,9 @@ public class LuceneUtil {
}
documentGroups.get(fileName).add(document);
}
+ for (List<Document> groupedDocuments : documentGroups.values()) {
+ sortDocsForRetrieval(groupedDocuments);
+ }
return documentGroups;
}
}