You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/04/20 05:13:21 UTC

[44/50] [abbrv] incubator-nifi git commit: NIFI-523: Do not read all lucene documents when we dont need to

NIFI-523: Do not read all lucene documents when we dont need to


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a06c2537
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a06c2537
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a06c2537

Branch: refs/heads/NIFI-271
Commit: a06c25373fbd4103c2c9ba1ba0d75d94726700d2
Parents: 509933f
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Apr 17 09:13:57 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Apr 17 09:13:57 2015 -0400

----------------------------------------------------------------------
 .../nifi/provenance/lucene/DocsReader.java      | 20 +++++-----
 .../provenance/serialization/RecordReaders.java | 41 ++++++++++++++++++--
 2 files changed, 47 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a06c2537/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
index af5fe50..6446a35 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
@@ -48,18 +48,22 @@ public class DocsReader {
             return Collections.emptySet();
         }
 
-        final List<Document> docs = new ArrayList<>();
+        final int numDocs = Math.min(topDocs.scoreDocs.length, maxResults);
+        final List<Document> docs = new ArrayList<>(numDocs);
 
-        for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
+        for (final ScoreDoc scoreDoc : topDocs.scoreDocs) {
             final int docId = scoreDoc.doc;
             final Document d = indexReader.document(docId);
             docs.add(d);
+            if ( retrievalCount.incrementAndGet() >= maxResults ) {
+                break;
+            }
         }
 
-        return read(docs, allProvenanceLogFiles, retrievalCount, maxResults);
+        return read(docs, allProvenanceLogFiles);
     }
 
-    public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles, final AtomicInteger retrievalCount, final int maxResults) throws IOException {
+    public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles) throws IOException {
         LuceneUtil.sortDocsForRetrieval(docs);
 
         RecordReader reader = null;
@@ -79,9 +83,6 @@ public class DocsReader {
                             reader.skipTo(byteOffset);
                             final StandardProvenanceEventRecord record = reader.nextRecord();
                             matchingRecords.add(record);
-                            if (retrievalCount.incrementAndGet() >= maxResults) {
-                                break;
-                            }
                         } catch (final IOException e) {
                             throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository");
                         }
@@ -91,7 +92,7 @@ public class DocsReader {
                             reader.close();
                         }
 
-                        final List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
+                        List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
                         if (potentialFiles.isEmpty()) {
                             throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository");
                         }
@@ -108,9 +109,6 @@ public class DocsReader {
 
                                 final StandardProvenanceEventRecord record = reader.nextRecord();
                                 matchingRecords.add(record);
-                                if (retrievalCount.incrementAndGet() >= maxResults) {
-                                    break;
-                                }
                             } catch (final IOException e) {
                                 throw new IOException("Failed to retrieve record from Provenance File " + file + " due to " + e, e);
                             }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a06c2537/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
index f902b92..8f06995 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
@@ -33,6 +33,8 @@ import org.apache.nifi.provenance.lucene.LuceneUtil;
 public class RecordReaders {
 
     public static RecordReader newRecordReader(File file, final Collection<Path> provenanceLogFiles) throws IOException {
+        final File originalFile = file;
+        
         if (!file.exists()) {
             if (provenanceLogFiles == null) {
                 throw new FileNotFoundException(file.toString());
@@ -47,11 +49,44 @@ public class RecordReaders {
             }
         }
 
-        if (file == null || !file.exists()) {
-            throw new FileNotFoundException(file.toString());
+        InputStream fis = null;
+        if ( file.exists() ) {
+            try {
+                fis = new FileInputStream(file);
+            } catch (final FileNotFoundException fnfe) {
+                fis = null;
+            }
+        }
+        
+        openStream: while ( fis == null ) {
+            final File dir = file.getParentFile();
+            final String baseName = LuceneUtil.substringBefore(file.getName(), ".");
+            
+            // depending on which rollover actions have occurred, we could have 3 possibilities for the
+            // filename that we need. The majority of the time, we will use the extension ".prov.indexed.gz"
+            // because most often we are compressing on rollover and most often we have already finished
+            // compressing by the time that we are querying the data.
+            for ( final String extension : new String[] {".indexed.prov.gz", ".indexed.prov", ".prov"} ) {
+                file = new File(dir, baseName + extension);
+                if ( file.exists() ) {
+                    try {
+                        fis = new FileInputStream(file);
+                        break openStream;
+                    } catch (final FileNotFoundException fnfe) {
+                        // file was modified by a RolloverAction after we verified that it exists but before we could
+                        // create an InputStream for it. Start over.
+                        fis = null;
+                        continue openStream;
+                    }
+                }
+            }
+            
+            break;
         }
 
-        final InputStream fis = new FileInputStream(file);
+        if ( fis == null ) {
+            throw new FileNotFoundException("Unable to locate file " + originalFile);
+        }
         final InputStream readableStream;
         if (file.getName().endsWith(".gz")) {
             readableStream = new BufferedInputStream(new GZIPInputStream(fis));