You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/04/17 18:30:58 UTC
[3/5] incubator-nifi git commit: NIFI-523: Do not read all lucene
documents when we dont need to
NIFI-523: Do not read all lucene documents when we dont need to
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a06c2537
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a06c2537
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a06c2537
Branch: refs/heads/develop
Commit: a06c25373fbd4103c2c9ba1ba0d75d94726700d2
Parents: 509933f
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Apr 17 09:13:57 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Apr 17 09:13:57 2015 -0400
----------------------------------------------------------------------
.../nifi/provenance/lucene/DocsReader.java | 20 +++++-----
.../provenance/serialization/RecordReaders.java | 41 ++++++++++++++++++--
2 files changed, 47 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a06c2537/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
index af5fe50..6446a35 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
@@ -48,18 +48,22 @@ public class DocsReader {
return Collections.emptySet();
}
- final List<Document> docs = new ArrayList<>();
+ final int numDocs = Math.min(topDocs.scoreDocs.length, maxResults);
+ final List<Document> docs = new ArrayList<>(numDocs);
- for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
+ for (final ScoreDoc scoreDoc : topDocs.scoreDocs) {
final int docId = scoreDoc.doc;
final Document d = indexReader.document(docId);
docs.add(d);
+ if ( retrievalCount.incrementAndGet() >= maxResults ) {
+ break;
+ }
}
- return read(docs, allProvenanceLogFiles, retrievalCount, maxResults);
+ return read(docs, allProvenanceLogFiles);
}
- public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles, final AtomicInteger retrievalCount, final int maxResults) throws IOException {
+ public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles) throws IOException {
LuceneUtil.sortDocsForRetrieval(docs);
RecordReader reader = null;
@@ -79,9 +83,6 @@ public class DocsReader {
reader.skipTo(byteOffset);
final StandardProvenanceEventRecord record = reader.nextRecord();
matchingRecords.add(record);
- if (retrievalCount.incrementAndGet() >= maxResults) {
- break;
- }
} catch (final IOException e) {
throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository");
}
@@ -91,7 +92,7 @@ public class DocsReader {
reader.close();
}
- final List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
+ List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
if (potentialFiles.isEmpty()) {
throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository");
}
@@ -108,9 +109,6 @@ public class DocsReader {
final StandardProvenanceEventRecord record = reader.nextRecord();
matchingRecords.add(record);
- if (retrievalCount.incrementAndGet() >= maxResults) {
- break;
- }
} catch (final IOException e) {
throw new IOException("Failed to retrieve record from Provenance File " + file + " due to " + e, e);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a06c2537/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
index f902b92..8f06995 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
@@ -33,6 +33,8 @@ import org.apache.nifi.provenance.lucene.LuceneUtil;
public class RecordReaders {
public static RecordReader newRecordReader(File file, final Collection<Path> provenanceLogFiles) throws IOException {
+ final File originalFile = file;
+
if (!file.exists()) {
if (provenanceLogFiles == null) {
throw new FileNotFoundException(file.toString());
@@ -47,11 +49,44 @@ public class RecordReaders {
}
}
- if (file == null || !file.exists()) {
- throw new FileNotFoundException(file.toString());
+ InputStream fis = null;
+ if ( file.exists() ) {
+ try {
+ fis = new FileInputStream(file);
+ } catch (final FileNotFoundException fnfe) {
+ fis = null;
+ }
+ }
+
+ openStream: while ( fis == null ) {
+ final File dir = file.getParentFile();
+ final String baseName = LuceneUtil.substringBefore(file.getName(), ".");
+
+ // depending on which rollover actions have occurred, we could have 3 possibilities for the
+ // filename that we need. The majority of the time, we will use the extension ".prov.indexed.gz"
+ // because most often we are compressing on rollover and most often we have already finished
+ // compressing by the time that we are querying the data.
+ for ( final String extension : new String[] {".indexed.prov.gz", ".indexed.prov", ".prov"} ) {
+ file = new File(dir, baseName + extension);
+ if ( file.exists() ) {
+ try {
+ fis = new FileInputStream(file);
+ break openStream;
+ } catch (final FileNotFoundException fnfe) {
+ // file was modified by a RolloverAction after we verified that it exists but before we could
+ // create an InputStream for it. Start over.
+ fis = null;
+ continue openStream;
+ }
+ }
+ }
+
+ break;
}
- final InputStream fis = new FileInputStream(file);
+ if ( fis == null ) {
+ throw new FileNotFoundException("Unable to locate file " + originalFile);
+ }
final InputStream readableStream;
if (file.getName().endsWith(".gz")) {
readableStream = new BufferedInputStream(new GZIPInputStream(fis));