You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/11/17 15:52:03 UTC

[1/3] nifi git commit: NIFI-748 Fixed logic around handling partial query results from provenance repository - Ensured that failures derived form correlating Document to its actual provenance event do fail the entire query and produce partial results wit

Repository: nifi
Updated Branches:
  refs/heads/master d16392c61 -> 453b140d6


NIFI-748 Fixed logic around handling partial query results from provenance repository
- Ensured that failures derived form correlating Document to its actual provenance event do fail the entire query and produce partial results with warning messages
- Refactored DocsReader.read() operation.
- Added test to validate two conditions where the such failures could occur


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a4d93c62
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a4d93c62
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a4d93c62

Branch: refs/heads/master
Commit: a4d93c62c88f594ef0cd3739a0536769f1ab9b26
Parents: 36d00a6
Author: Oleg Zhurakousky <ol...@suitcase.io>
Authored: Fri Nov 13 14:08:39 2015 -0500
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Fri Nov 13 14:23:31 2015 -0500

----------------------------------------------------------------------
 .../PersistentProvenanceRepository.java         |   1 +
 .../nifi/provenance/lucene/DocsReader.java      | 116 ++++++-------------
 .../nifi/provenance/lucene/IndexSearch.java     |   2 +-
 .../nifi/provenance/lucene/LineageQuery.java    |   2 +-
 .../nifi/provenance/lucene/LuceneUtil.java      |  23 ++++
 .../TestPersistentProvenanceRepository.java     |  70 +++++++++++
 6 files changed, 132 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a4d93c62/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 89e1419..1740f51 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -1784,6 +1784,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             try {
                 Thread.sleep(100L);
             } catch (final InterruptedException ie) {
+                Thread.currentThread().interrupt();
             }
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a4d93c62/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
index c2a7609..7bd800b 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
@@ -17,15 +17,14 @@
 package org.apache.nifi.provenance.lucene;
 
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -47,9 +46,6 @@ import org.slf4j.LoggerFactory;
 public class DocsReader {
     private final Logger logger = LoggerFactory.getLogger(DocsReader.class);
 
-    public DocsReader(final List<File> storageDirectories) {
-    }
-
     public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles,
             final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException {
         if (retrievalCount.get() >= maxResults) {
@@ -100,101 +96,61 @@ public class DocsReader {
             }
         }
 
-        if ( record == null ) {
-            throw new IOException("Failed to find Provenance Event " + d);
-        } else {
-            return record;
+        if (record == null) {
+            logger.warn("Failed to read Provenance Event for '" + d + "'. The event file may be missing or corrupted");
         }
-    }
 
+        return record;
+    }
 
     public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles,
-        final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException {
-        if (retrievalCount.get() >= maxResults) {
-            return Collections.emptySet();
-        }
-
-        LuceneUtil.sortDocsForRetrieval(docs);
-
-        RecordReader reader = null;
-        String lastStorageFilename = null;
-        final Set<ProvenanceEventRecord> matchingRecords = new LinkedHashSet<>();
+            final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException {
 
         final long start = System.nanoTime();
-        int logFileCount = 0;
-
-        final Set<String> storageFilesToSkip = new HashSet<>();
-        int eventsReadThisFile = 0;
 
-        try {
-            for (final Document d : docs) {
-                final String storageFilename = d.getField(FieldNames.STORAGE_FILENAME).stringValue();
-                if ( storageFilesToSkip.contains(storageFilename) ) {
-                    continue;
-                }
-
-                try {
-                    if (reader != null && storageFilename.equals(lastStorageFilename)) {
-                        matchingRecords.add(getRecord(d, reader));
-                        eventsReadThisFile++;
-
-                        if ( retrievalCount.incrementAndGet() >= maxResults ) {
-                            break;
-                        }
-                    } else {
-                        logger.debug("Opening log file {}", storageFilename);
-
-                        logFileCount++;
-                        if (reader != null) {
-                            reader.close();
-                        }
+        Set<ProvenanceEventRecord> matchingRecords = new LinkedHashSet<>();
+        if (retrievalCount.get() >= maxResults) {
+            return matchingRecords;
+        }
 
-                        final List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
-                        if (potentialFiles.isEmpty()) {
-                            logger.warn("Could not find Provenance Log File with basename {} in the "
-                                    + "Provenance Repository; assuming file has expired and continuing without it", storageFilename);
-                            storageFilesToSkip.add(storageFilename);
-                            continue;
-                        }
+        Map<String, List<Document>> byStorageNameDocGroups = LuceneUtil.groupDocsByStorageFileName(docs);
 
-                        if (potentialFiles.size() > 1) {
-                            throw new FileNotFoundException("Found multiple Provenance Log Files with basename " +
-                                    storageFilename + " in the Provenance Repository");
-                        }
+        int eventsReadThisFile = 0;
+        int logFileCount = 0;
 
-                        for (final File file : potentialFiles) {
-                            try {
-                                if (reader != null) {
-                                    logger.debug("Read {} records from previous file", eventsReadThisFile);
-                                }
-
-                                reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles, maxAttributeChars);
-                                matchingRecords.add(getRecord(d, reader));
-                                eventsReadThisFile = 1;
-
-                                if ( retrievalCount.incrementAndGet() >= maxResults ) {
-                                    break;
-                                }
-                            } catch (final IOException e) {
-                                throw new IOException("Failed to retrieve record " + d + " from Provenance File " + file + " due to " + e, e);
+        for (String storageFileName : byStorageNameDocGroups.keySet()) {
+            File provenanceEventFile = LuceneUtil.getProvenanceLogFile(storageFileName, allProvenanceLogFiles);
+            if (provenanceEventFile != null) {
+                try (RecordReader reader = RecordReaders.newRecordReader(provenanceEventFile, allProvenanceLogFiles,
+                        maxAttributeChars)) {
+                    for (Document document : byStorageNameDocGroups.get(storageFileName)) {
+                        ProvenanceEventRecord eRec = this.getRecord(document, reader);
+                        if (eRec != null) {
+                            matchingRecords.add(eRec);
+                            eventsReadThisFile++;
+
+                            if (retrievalCount.incrementAndGet() >= maxResults) {
+                                break;
                             }
                         }
                     }
-                } finally {
-                    lastStorageFilename = storageFilename;
+                } catch (Exception e) {
+                    logger.warn("Failed while trying to read Provenance Events. The event file '"
+                            + provenanceEventFile.getAbsolutePath() +
+                            "' may be missing or corrupted.", e);
                 }
-            }
-        } finally {
-            if (reader != null) {
-                reader.close();
+            } else {
+                logger.warn("Could not find Provenance Log File with "
+                        + "basename {} in the Provenance Repository; assuming "
+                        + "file has expired and continuing without it", storageFileName);
             }
         }
 
         logger.debug("Read {} records from previous file", eventsReadThisFile);
         final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-        logger.debug("Took {} ms to read {} events from {} prov log files", millis, matchingRecords.size(), logFileCount);
+        logger.debug("Took {} ms to read {} events from {} prov log files", millis, matchingRecords.size(),
+                logFileCount);
 
         return matchingRecords;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a4d93c62/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
index 7fcd8ab..b8661df 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
@@ -89,7 +89,7 @@ public class IndexSearch {
                 return sqr;
             }
 
-            final DocsReader docsReader = new DocsReader(repository.getConfiguration().getStorageDirectories());
+            final DocsReader docsReader = new DocsReader();
             matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount,
                 provenanceQuery.getMaxResults(), maxAttributeChars);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a4d93c62/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
index e9e6e63..ce60e03 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
@@ -93,7 +93,7 @@ public class LineageQuery {
                 final TopDocs uuidQueryTopDocs = searcher.search(query, MAX_QUERY_RESULTS);
                 final long searchEnd = System.nanoTime();
 
-                final DocsReader docsReader = new DocsReader(repo.getConfiguration().getStorageDirectories());
+                final DocsReader docsReader = new DocsReader();
                 final Set<ProvenanceEventRecord> recs = docsReader.read(uuidQueryTopDocs, searcher.getIndexReader(), repo.getAllLogFiles(),
                     new AtomicInteger(0), Integer.MAX_VALUE, maxAttributeChars);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a4d93c62/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
index c622ea1..aa50b94 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
@@ -22,7 +22,9 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.provenance.SearchableFields;
@@ -160,4 +162,25 @@ public class LuceneUtil {
             }
         });
     }
+
+    /**
+     * Will group documents based on the {@link FieldNames#STORAGE_FILENAME}.
+     *
+     * @param documents
+     *            list of {@link Document}s
+     * @return a {@link Map} of document groups with
+     *         {@link FieldNames#STORAGE_FILENAME} as key and {@link List} of
+     *         {@link Document}s as value.
+     */
+    public static Map<String, List<Document>> groupDocsByStorageFileName(final List<Document> documents) {
+        Map<String, List<Document>> documentGroups = new HashMap<>();
+        for (Document document : documents) {
+            String fileName = document.get(FieldNames.STORAGE_FILENAME);
+            if (!documentGroups.containsKey(fileName)) {
+                documentGroups.put(fileName, new ArrayList<Document>());
+            }
+            documentGroups.get(fileName).add(document);
+        }
+        return documentGroups;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a4d93c62/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index 5e4aed0..02b9216 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -20,9 +20,11 @@ import static org.apache.nifi.provenance.TestUtil.createFlowFile;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 import java.io.File;
 import java.io.FileFilter;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -36,6 +38,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.zip.GZIPOutputStream;
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.core.SimpleAnalyzer;
@@ -48,6 +51,7 @@ import org.apache.lucene.search.ScoreDoc;
 import org.apache.lucene.search.TopDocs;
 import org.apache.lucene.store.FSDirectory;
 import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.provenance.lineage.EventNode;
 import org.apache.nifi.provenance.lineage.Lineage;
 import org.apache.nifi.provenance.lineage.LineageEdge;
@@ -869,6 +873,72 @@ public class TestPersistentProvenanceRepository {
         }
     }
 
+    /**
+     * Here the event file is simply corrupted by virtue of not having any event
+     * records while having correct headers
+     */
+    @Test
+    public void testWithWithEventFileMissingRecord() throws Exception {
+        File eventFile = this.prepCorruptedEventFileTests();
+
+        final Query query = new Query(UUID.randomUUID().toString());
+        query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "foo-*"));
+        query.setMaxResults(100);
+
+        DataOutputStream in = new DataOutputStream(new GZIPOutputStream(new FileOutputStream(eventFile)));
+        in.writeUTF("BlahBlah");
+        in.writeInt(4);
+        in.close();
+        assertTrue(eventFile.exists());
+        final QueryResult result = repo.queryEvents(query);
+        assertEquals(10, result.getMatchingEvents().size());
+    }
+
+    /**
+     * Here the event file is simply corrupted by virtue of being empty (0
+     * bytes)
+     */
+    @Test
+    public void testWithWithEventFileCorrupted() throws Exception {
+        File eventFile = this.prepCorruptedEventFileTests();
+
+        final Query query = new Query(UUID.randomUUID().toString());
+        query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "foo-*"));
+        query.setMaxResults(100);
+        DataOutputStream in = new DataOutputStream(new GZIPOutputStream(new FileOutputStream(eventFile)));
+        in.close();
+        final QueryResult result = repo.queryEvents(query);
+        assertEquals(10, result.getMatchingEvents().size());
+    }
+
+    private File prepCorruptedEventFileTests() throws Exception {
+        RepositoryConfiguration config = createConfiguration();
+        config.setMaxStorageCapacity(1024L * 1024L);
+        config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
+        config.setMaxEventFileCapacity(1024L * 1024L);
+        config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
+        config.setDesiredIndexSize(10);
+
+        repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
+        repo.initialize(getEventReporter());
+
+        String uuid = UUID.randomUUID().toString();
+        for (int i = 0; i < 20; i++) {
+            ProvenanceEventRecord record = repo.eventBuilder().fromFlowFile(mock(FlowFile.class))
+                    .setEventType(ProvenanceEventType.CREATE).setComponentId("foo-" + i).setComponentType("myComponent")
+                    .setFlowFileUUID(uuid).build();
+            repo.registerEvent(record);
+            if (i == 9) {
+                repo.waitForRollover();
+                Thread.sleep(2000L);
+            }
+        }
+        repo.waitForRollover();
+        File eventFile = new File(config.getStorageDirectories().get(0), "10.prov.gz");
+        assertTrue(eventFile.delete());
+        return eventFile;
+    }
+
     @Test
     public void testIndexDirectoryRemoved() throws InterruptedException, IOException, ParseException {
         final RepositoryConfiguration config = createConfiguration();


[3/3] nifi git commit: Merge branch 'NIFI-748' of https://github.com/olegz/nifi into NIFI-748

Posted by ma...@apache.org.
Merge branch 'NIFI-748' of https://github.com/olegz/nifi into NIFI-748


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/453b140d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/453b140d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/453b140d

Branch: refs/heads/master
Commit: 453b140d6bbd73bc63fa3e8cb9e7072eb7709e50
Parents: d16392c 15880f9
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Nov 17 09:23:10 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Nov 17 09:23:10 2015 -0500

----------------------------------------------------------------------
 .../PersistentProvenanceRepository.java         |   1 +
 .../nifi/provenance/lucene/DocsReader.java      | 115 ++++++-------------
 .../nifi/provenance/lucene/IndexSearch.java     |   2 +-
 .../nifi/provenance/lucene/LineageQuery.java    |   2 +-
 .../nifi/provenance/lucene/LuceneUtil.java      |  36 +++++-
 .../TestPersistentProvenanceRepository.java     |  70 +++++++++++
 6 files changed, 144 insertions(+), 82 deletions(-)
----------------------------------------------------------------------



[2/3] nifi git commit: NIFI-748 addressed PR comments - made DocReader package private - polished logic in read(..) method to avoid escaping the loop - added call to sorting logic in LuceneUtil.groupDocsByStorageFileName(..) to ensure that previous behav

Posted by ma...@apache.org.
NIFI-748 addressed PR comments
- made DocReader package private
- polished logic in read(..) method to avoid escaping the loop
- added call to sorting logic in LuceneUtil.groupDocsByStorageFileName(..) to ensure that previous behavior and assumptions in read(..) methodd are preserved
- other minor polishing


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/15880f9f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/15880f9f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/15880f9f

Branch: refs/heads/master
Commit: 15880f9fcc78e7a040e0d64d2a517390122e3fe3
Parents: a4d93c6
Author: Oleg Zhurakousky <ol...@suitcase.io>
Authored: Mon Nov 16 08:39:23 2015 -0500
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Mon Nov 16 08:39:23 2015 -0500

----------------------------------------------------------------------
 .../nifi/provenance/lucene/DocsReader.java      | 21 ++++++++++----------
 .../nifi/provenance/lucene/LuceneUtil.java      | 15 ++++++++++++--
 2 files changed, 24 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/15880f9f/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
index 7bd800b..703a5b8 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
@@ -22,6 +22,7 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -43,7 +44,7 @@ import org.apache.lucene.search.TopDocs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class DocsReader {
+class DocsReader {
     private final Logger logger = LoggerFactory.getLogger(DocsReader.class);
 
     public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles,
@@ -106,12 +107,13 @@ public class DocsReader {
     public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles,
             final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException {
 
+        if (retrievalCount.get() >= maxResults) {
+            return Collections.emptySet();
+        }
+
         final long start = System.nanoTime();
 
         Set<ProvenanceEventRecord> matchingRecords = new LinkedHashSet<>();
-        if (retrievalCount.get() >= maxResults) {
-            return matchingRecords;
-        }
 
         Map<String, List<Document>> byStorageNameDocGroups = LuceneUtil.groupDocsByStorageFileName(docs);
 
@@ -123,17 +125,16 @@ public class DocsReader {
             if (provenanceEventFile != null) {
                 try (RecordReader reader = RecordReaders.newRecordReader(provenanceEventFile, allProvenanceLogFiles,
                         maxAttributeChars)) {
-                    for (Document document : byStorageNameDocGroups.get(storageFileName)) {
-                        ProvenanceEventRecord eRec = this.getRecord(document, reader);
+
+                    Iterator<Document> docIter = byStorageNameDocGroups.get(storageFileName).iterator();
+                    while (docIter.hasNext() && retrievalCount.incrementAndGet() < maxResults){
+                        ProvenanceEventRecord eRec = this.getRecord(docIter.next(), reader);
                         if (eRec != null) {
                             matchingRecords.add(eRec);
                             eventsReadThisFile++;
-
-                            if (retrievalCount.incrementAndGet() >= maxResults) {
-                                break;
-                            }
                         }
                     }
+
                 } catch (Exception e) {
                     logger.warn("Failed while trying to read Provenance Events. The event file '"
                             + provenanceEventFile.getAbsolutePath() +

http://git-wip-us.apache.org/repos/asf/nifi/blob/15880f9f/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
index aa50b94..08a99d6 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
@@ -130,8 +130,14 @@ public class LuceneUtil {
         return luceneQuery;
     }
 
+    /**
+     * Will sort documents by filename and then file offset so that we can
+     * retrieve the records efficiently
+     *
+     * @param documents
+     *            list of {@link Document}s
+     */
     public static void sortDocsForRetrieval(final List<Document> documents) {
-        // sort by filename and then file offset so that we can retrieve the records efficiently
         Collections.sort(documents, new Comparator<Document>() {
             @Override
             public int compare(final Document o1, final Document o2) {
@@ -167,7 +173,9 @@ public class LuceneUtil {
      * Will group documents based on the {@link FieldNames#STORAGE_FILENAME}.
      *
      * @param documents
-     *            list of {@link Document}s
+     *            list of {@link Document}s which will be sorted via
+     *            {@link #sortDocsForRetrieval(List)} for more efficient record
+     *            retrieval.
      * @return a {@link Map} of document groups with
      *         {@link FieldNames#STORAGE_FILENAME} as key and {@link List} of
      *         {@link Document}s as value.
@@ -181,6 +189,9 @@ public class LuceneUtil {
             }
             documentGroups.get(fileName).add(document);
         }
+        for (List<Document> groupedDocuments : documentGroups.values()) {
+            sortDocsForRetrieval(groupedDocuments);
+        }
         return documentGroups;
     }
 }