You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/04/27 20:14:59 UTC
[1/4] incubator-nifi git commit: NIFI-527: Code cleanup
Repository: incubator-nifi
Updated Branches:
refs/heads/develop 10860944d -> 384b2ac25
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
index 8944cec..7c13a2a 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
@@ -24,9 +24,9 @@ import java.io.IOException;
/**
* Standard implementation of TocReader.
- *
+ *
* Expects .toc file to be in the following format;
- *
+ *
* byte 0: version
* byte 1: boolean: compressionFlag -> 0 = journal is NOT compressed, 1 = journal is compressed
* byte 2-9: long: offset of block 0
@@ -37,21 +37,21 @@ import java.io.IOException;
public class StandardTocReader implements TocReader {
private final boolean compressed;
private final long[] offsets;
-
+
public StandardTocReader(final File file) throws IOException {
try (final FileInputStream fis = new FileInputStream(file);
- final DataInputStream dis = new DataInputStream(fis)) {
-
+ final DataInputStream dis = new DataInputStream(fis)) {
+
final int version = dis.read();
if ( version < 0 ) {
throw new EOFException();
}
-
+
final int compressionFlag = dis.read();
if ( compressionFlag < 0 ) {
throw new EOFException();
}
-
+
if ( compressionFlag == 0 ) {
compressed = false;
} else if ( compressionFlag == 1 ) {
@@ -59,21 +59,21 @@ public class StandardTocReader implements TocReader {
} else {
throw new IOException("Table of Contents appears to be corrupt: could not read 'compression flag' from header; expected value of 0 or 1 but got " + compressionFlag);
}
-
+
final int numBlocks = (int) ((file.length() - 2) / 8);
offsets = new long[numBlocks];
-
+
for (int i=0; i < numBlocks; i++) {
offsets[i] = dis.readLong();
}
}
}
-
+
@Override
public boolean isCompressed() {
return compressed;
}
-
+
@Override
public long getBlockOffset(final int blockIndex) {
if ( blockIndex >= offsets.length ) {
@@ -89,20 +89,20 @@ public class StandardTocReader implements TocReader {
}
return offsets[offsets.length - 1];
}
-
+
@Override
public void close() throws IOException {
}
- @Override
- public int getBlockIndex(final long blockOffset) {
- for (int i=0; i < offsets.length; i++) {
- if ( offsets[i] > blockOffset ) {
- return i-1;
- }
- }
-
- return offsets.length - 1;
- }
+ @Override
+ public int getBlockIndex(final long blockOffset) {
+ for (int i=0; i < offsets.length; i++) {
+ if ( offsets[i] > blockOffset ) {
+ return i-1;
+ }
+ }
+
+ return offsets.length - 1;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
index 488f225..10de459 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
@@ -19,7 +19,6 @@ package org.apache.nifi.provenance.toc;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
@@ -29,7 +28,7 @@ import org.slf4j.LoggerFactory;
/**
* Standard implementation of {@link TocWriter}.
- *
+ *
* Format of .toc file:
* byte 0: version
* byte 1: compressed: 0 -> not compressed, 1 -> compressed
@@ -39,27 +38,27 @@ import org.slf4j.LoggerFactory;
* byte (N*8+2)-(N*8+9): long: offset of block N
*/
public class StandardTocWriter implements TocWriter {
- private static final Logger logger = LoggerFactory.getLogger(StandardTocWriter.class);
-
+ private static final Logger logger = LoggerFactory.getLogger(StandardTocWriter.class);
+
public static final byte VERSION = 1;
-
+
private final File file;
private final FileOutputStream fos;
private final boolean alwaysSync;
private int index = -1;
-
+
/**
* Creates a StandardTocWriter that writes to the given file.
* @param file the file to write to
* @param compressionFlag whether or not the journal is compressed
- * @throws FileNotFoundException
+ * @throws IOException if unable to write header info to the specified file
*/
public StandardTocWriter(final File file, final boolean compressionFlag, final boolean alwaysSync) throws IOException {
final File tocDir = file.getParentFile();
if ( !tocDir.exists() ) {
- Files.createDirectories(tocDir.toPath());
+ Files.createDirectories(tocDir.toPath());
}
-
+
this.file = file;
fos = new FileOutputStream(file);
this.alwaysSync = alwaysSync;
@@ -69,12 +68,12 @@ public class StandardTocWriter implements TocWriter {
header[1] = (byte) (compressionFlag ? 1 : 0);
fos.write(header);
fos.flush();
-
+
if ( alwaysSync ) {
sync();
}
}
-
+
@Override
public void addBlockOffset(final long offset) throws IOException {
final BufferedOutputStream bos = new BufferedOutputStream(fos);
@@ -83,17 +82,17 @@ public class StandardTocWriter implements TocWriter {
dos.flush();
index++;
logger.debug("Adding block {} at offset {}", index, offset);
-
+
if ( alwaysSync ) {
sync();
}
}
-
+
@Override
public void sync() throws IOException {
- fos.getFD().sync();
+ fos.getFD().sync();
}
-
+
@Override
public int getCurrentBlockIndex() {
return index;
@@ -104,15 +103,15 @@ public class StandardTocWriter implements TocWriter {
if (alwaysSync) {
fos.getFD().sync();
}
-
+
fos.close();
}
-
+
@Override
public File getFile() {
return file;
}
-
+
@Override
public String toString() {
return "TOC Writer for " + file;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java
index 7c197be..97e2838 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java
@@ -32,27 +32,31 @@ public interface TocReader extends Closeable {
/**
* Indicates whether or not the corresponding Journal file is compressed
- * @return
+ * @return <code>true</code> if the event file is compressed
*/
boolean isCompressed();
/**
* Returns the byte offset into the Journal File for the Block with the given index.
- * @param blockIndex
- * @return
+ *
+ * @param blockIndex the block index to get the byte offset for
+ * @return the byte offset for the given block index, or <code>-1</code> if the given block index
+ * does not exist
*/
long getBlockOffset(int blockIndex);
-
+
/**
* Returns the byte offset into the Journal File of the last Block in the given index
- * @return
+ * @return the byte offset into the Journal File of the last Block in the given index
*/
long getLastBlockOffset();
-
+
/**
* Returns the index of the block that contains the given offset
- * @param blockOffset
- * @return
+ *
+ * @param blockOffset the byte offset for which the block index is desired
+ *
+ * @return the index of the block that contains the given offset
*/
int getBlockIndex(long blockOffset);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java
index c30ac98..3fa7d67 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java
@@ -22,16 +22,19 @@ import org.apache.nifi.provenance.lucene.LuceneUtil;
public class TocUtil {
- /**
- * Returns the file that should be used as the Table of Contents for the given Journal File
- * @param journalFile
- * @return
- */
- public static File getTocFile(final File journalFile) {
- final File tocDir = new File(journalFile.getParentFile(), "toc");
- final String basename = LuceneUtil.substringBefore(journalFile.getName(), ".");
- final File tocFile = new File(tocDir, basename + ".toc");
- return tocFile;
- }
-
+ /**
+ * Returns the file that should be used as the Table of Contents for the given Journal File.
+ * Note, if no TOC exists for the given Journal File, a File will still be returned but the file
+ * will not actually exist.
+ *
+ * @param journalFile the journal file for which to get the Table of Contents
+ * @return the file that represents the Table of Contents for the specified journal file.
+ */
+ public static File getTocFile(final File journalFile) {
+ final File tocDir = new File(journalFile.getParentFile(), "toc");
+ final String basename = LuceneUtil.substringBefore(journalFile.getName(), ".");
+ final File tocFile = new File(tocDir, basename + ".toc");
+ return tocFile;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java
index c678053..38f910f 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java
@@ -27,26 +27,24 @@ public interface TocWriter extends Closeable {
/**
* Adds the given block offset as the next Block Offset in the Table of Contents
- * @param offset
- * @throws IOException
+ * @param offset the byte offset at which the block begins
+ * @throws IOException if unable to persist the block index
*/
void addBlockOffset(long offset) throws IOException;
-
+
/**
- * Returns the index of the current Block
- * @return
+ * @return the index of the current Block
*/
int getCurrentBlockIndex();
-
+
/**
- * Returns the file that is currently being written to
- * @return
+ * @return the file that is currently being written to
*/
File getFile();
/**
* Synchronizes the data with the underlying storage device
- * @throws IOException
+ * @throws IOException if unable to synchronize the data with the underlying storage device
*/
void sync() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index 5541ab5..7d97bcd 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -75,7 +75,7 @@ public class TestPersistentProvenanceRepository {
private PersistentProvenanceRepository repo;
private RepositoryConfiguration config;
-
+
public static final int DEFAULT_ROLLOVER_MILLIS = 2000;
private RepositoryConfiguration createConfiguration() {
@@ -89,9 +89,9 @@ public class TestPersistentProvenanceRepository {
@BeforeClass
public static void setLogLevel() {
- System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
+ System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
}
-
+
@Before
public void printTestName() {
System.out.println("\n\n\n*********************** " + name.getMethodName() + " *****************************");
@@ -105,33 +105,33 @@ public class TestPersistentProvenanceRepository {
} catch (final IOException ioe) {
}
}
-
+
// Delete all of the storage files. We do this in order to clean up the tons of files that
// we create but also to ensure that we have closed all of the file handles. If we leave any
// streams open, for instance, this will throw an IOException, causing our unit test to fail.
for ( final File storageDir : config.getStorageDirectories() ) {
- int i;
- for (i=0; i < 3; i++) {
- try {
- FileUtils.deleteFile(storageDir, true);
- break;
- } catch (final IOException ioe) {
- // if there is a virus scanner, etc. running in the background we may not be able to
- // delete the file. Wait a sec and try again.
- if ( i == 2 ) {
- throw ioe;
- } else {
- try {
- Thread.sleep(1000L);
- } catch (final InterruptedException ie) {
- }
- }
- }
- }
+ int i;
+ for (i=0; i < 3; i++) {
+ try {
+ FileUtils.deleteFile(storageDir, true);
+ break;
+ } catch (final IOException ioe) {
+ // if there is a virus scanner, etc. running in the background we may not be able to
+ // delete the file. Wait a sec and try again.
+ if ( i == 2 ) {
+ throw ioe;
+ } else {
+ try {
+ Thread.sleep(1000L);
+ } catch (final InterruptedException ie) {
+ }
+ }
+ }
+ }
}
}
-
+
private EventReporter getEventReporter() {
return new EventReporter() {
@@ -241,7 +241,7 @@ public class TestPersistentProvenanceRepository {
}
Thread.sleep(1000L);
-
+
repo.close();
Thread.sleep(500L); // Give the repo time to shutdown (i.e., close all file handles, etc.)
@@ -431,7 +431,7 @@ public class TestPersistentProvenanceRepository {
repo.waitForRollover();
final Query query = new Query(UUID.randomUUID().toString());
-// query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000000-0000-0000-0000*"));
+ // query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000000-0000-0000-0000*"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*"));
@@ -905,14 +905,14 @@ public class TestPersistentProvenanceRepository {
secondRepo.initialize(getEventReporter());
try {
- final ProvenanceEventRecord event11 = builder.build();
- secondRepo.registerEvent(event11);
- secondRepo.waitForRollover();
- final ProvenanceEventRecord event11Retrieved = secondRepo.getEvent(10L);
- assertNotNull(event11Retrieved);
- assertEquals(10, event11Retrieved.getEventId());
+ final ProvenanceEventRecord event11 = builder.build();
+ secondRepo.registerEvent(event11);
+ secondRepo.waitForRollover();
+ final ProvenanceEventRecord event11Retrieved = secondRepo.getEvent(10L);
+ assertNotNull(event11Retrieved);
+ assertEquals(10, event11Retrieved.getEventId());
} finally {
- secondRepo.close();
+ secondRepo.close();
}
}
@@ -983,26 +983,26 @@ public class TestPersistentProvenanceRepository {
storageDirFiles = config.getStorageDirectories().get(0).listFiles(indexFileFilter);
assertEquals(0, storageDirFiles.length);
}
-
-
+
+
@Test
public void testBackPressure() throws IOException, InterruptedException {
final RepositoryConfiguration config = createConfiguration();
- config.setMaxEventFileCapacity(1L); // force rollover on each record.
+ config.setMaxEventFileCapacity(1L); // force rollover on each record.
config.setJournalCount(1);
-
+
final AtomicInteger journalCountRef = new AtomicInteger(0);
-
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
- @Override
- protected int getJournalCount() {
- return journalCountRef.get();
- }
- };
+
+ repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
+ @Override
+ protected int getJournalCount() {
+ return journalCountRef.get();
+ }
+ };
repo.initialize(getEventReporter());
- final Map<String, String> attributes = new HashMap<>();
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+ final Map<String, String> attributes = new HashMap<>();
+ final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
builder.setEventTime(System.currentTimeMillis());
builder.setEventType(ProvenanceEventType.RECEIVE);
builder.setTransitUri("nifi://unit-test");
@@ -1023,31 +1023,31 @@ public class TestPersistentProvenanceRepository {
final AtomicLong threadNanos = new AtomicLong(0L);
final Thread t = new Thread(new Runnable() {
- @Override
- public void run() {
- final long start = System.nanoTime();
- builder.fromFlowFile(createFlowFile(13, 3000L, attributes));
- attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 13);
- repo.registerEvent(builder.build());
- threadNanos.set(System.nanoTime() - start);
- }
+ @Override
+ public void run() {
+ final long start = System.nanoTime();
+ builder.fromFlowFile(createFlowFile(13, 3000L, attributes));
+ attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 13);
+ repo.registerEvent(builder.build());
+ threadNanos.set(System.nanoTime() - start);
+ }
});
t.start();
Thread.sleep(1500L);
-
+
journalCountRef.set(1);
t.join();
-
+
final int threadMillis = (int) TimeUnit.NANOSECONDS.toMillis(threadNanos.get());
- assertTrue(threadMillis > 1200); // use 1200 to account for the fact that the timing is not exact
-
+ assertTrue(threadMillis > 1200); // use 1200 to account for the fact that the timing is not exact
+
builder.fromFlowFile(createFlowFile(15, 3000L, attributes));
attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 15);
repo.registerEvent(builder.build());
}
-
-
+
+
// TODO: test EOF on merge
// TODO: Test journal with no records
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
index 6f85b94..136f244 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
@@ -40,15 +40,15 @@ import org.junit.Test;
public class TestStandardRecordReaderWriter {
@BeforeClass
public static void setLogLevel() {
- System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
+ System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
}
- private ProvenanceEventRecord createEvent() {
- final Map<String, String> attributes = new HashMap<>();
- attributes.put("filename", "1.txt");
+ private ProvenanceEventRecord createEvent() {
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("filename", "1.txt");
attributes.put("uuid", UUID.randomUUID().toString());
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+ final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
builder.setEventTime(System.currentTimeMillis());
builder.setEventType(ProvenanceEventType.RECEIVE);
builder.setTransitUri("nifi://unit-test");
@@ -58,132 +58,132 @@ public class TestStandardRecordReaderWriter {
final ProvenanceEventRecord record = builder.build();
return record;
- }
-
- @Test
- public void testSimpleWriteWithToc() throws IOException {
+ }
+
+ @Test
+ public void testSimpleWriteWithToc() throws IOException {
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite");
final File tocFile = TocUtil.getTocFile(journalFile);
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, false, 1024 * 1024);
-
+
writer.writeHeader();
writer.writeRecord(createEvent(), 1L);
writer.close();
final TocReader tocReader = new StandardTocReader(tocFile);
-
+
try (final FileInputStream fis = new FileInputStream(journalFile);
- final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
- assertEquals(0, reader.getBlockIndex());
- reader.skipToBlock(0);
- StandardProvenanceEventRecord recovered = reader.nextRecord();
- assertNotNull(recovered);
-
- assertEquals("nifi://unit-test", recovered.getTransitUri());
- assertNull(reader.nextRecord());
+ final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+ assertEquals(0, reader.getBlockIndex());
+ reader.skipToBlock(0);
+ StandardProvenanceEventRecord recovered = reader.nextRecord();
+ assertNotNull(recovered);
+
+ assertEquals("nifi://unit-test", recovered.getTransitUri());
+ assertNull(reader.nextRecord());
}
-
+
FileUtils.deleteFile(journalFile.getParentFile(), true);
- }
-
-
- @Test
- public void testSingleRecordCompressed() throws IOException {
+ }
+
+
+ @Test
+ public void testSingleRecordCompressed() throws IOException {
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
final File tocFile = TocUtil.getTocFile(journalFile);
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 100);
-
+
writer.writeHeader();
writer.writeRecord(createEvent(), 1L);
writer.close();
final TocReader tocReader = new StandardTocReader(tocFile);
-
+
try (final FileInputStream fis = new FileInputStream(journalFile);
- final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
- assertEquals(0, reader.getBlockIndex());
- reader.skipToBlock(0);
- StandardProvenanceEventRecord recovered = reader.nextRecord();
- assertNotNull(recovered);
-
- assertEquals("nifi://unit-test", recovered.getTransitUri());
- assertNull(reader.nextRecord());
+ final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+ assertEquals(0, reader.getBlockIndex());
+ reader.skipToBlock(0);
+ StandardProvenanceEventRecord recovered = reader.nextRecord();
+ assertNotNull(recovered);
+
+ assertEquals("nifi://unit-test", recovered.getTransitUri());
+ assertNull(reader.nextRecord());
}
-
+
FileUtils.deleteFile(journalFile.getParentFile(), true);
- }
-
-
- @Test
- public void testMultipleRecordsSameBlockCompressed() throws IOException {
+ }
+
+
+ @Test
+ public void testMultipleRecordsSameBlockCompressed() throws IOException {
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
final File tocFile = TocUtil.getTocFile(journalFile);
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
// new record each 1 MB of uncompressed data
final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 1024 * 1024);
-
+
writer.writeHeader();
for (int i=0; i < 10; i++) {
- writer.writeRecord(createEvent(), i);
+ writer.writeRecord(createEvent(), i);
}
writer.close();
final TocReader tocReader = new StandardTocReader(tocFile);
-
+
try (final FileInputStream fis = new FileInputStream(journalFile);
- final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
- for (int i=0; i < 10; i++) {
- assertEquals(0, reader.getBlockIndex());
-
- // call skipToBlock half the time to ensure that we can; avoid calling it
- // the other half of the time to ensure that it's okay.
- if (i <= 5) {
- reader.skipToBlock(0);
- }
-
- StandardProvenanceEventRecord recovered = reader.nextRecord();
- assertNotNull(recovered);
- assertEquals("nifi://unit-test", recovered.getTransitUri());
- }
-
- assertNull(reader.nextRecord());
+ final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+ for (int i=0; i < 10; i++) {
+ assertEquals(0, reader.getBlockIndex());
+
+ // call skipToBlock half the time to ensure that we can; avoid calling it
+ // the other half of the time to ensure that it's okay.
+ if (i <= 5) {
+ reader.skipToBlock(0);
+ }
+
+ StandardProvenanceEventRecord recovered = reader.nextRecord();
+ assertNotNull(recovered);
+ assertEquals("nifi://unit-test", recovered.getTransitUri());
+ }
+
+ assertNull(reader.nextRecord());
}
-
+
FileUtils.deleteFile(journalFile.getParentFile(), true);
- }
-
-
- @Test
- public void testMultipleRecordsMultipleBlocksCompressed() throws IOException {
+ }
+
+
+ @Test
+ public void testMultipleRecordsMultipleBlocksCompressed() throws IOException {
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
final File tocFile = TocUtil.getTocFile(journalFile);
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
// new block each 10 bytes
final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 100);
-
+
writer.writeHeader();
for (int i=0; i < 10; i++) {
- writer.writeRecord(createEvent(), i);
+ writer.writeRecord(createEvent(), i);
}
writer.close();
final TocReader tocReader = new StandardTocReader(tocFile);
-
+
try (final FileInputStream fis = new FileInputStream(journalFile);
- final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
- for (int i=0; i < 10; i++) {
- StandardProvenanceEventRecord recovered = reader.nextRecord();
- System.out.println(recovered);
- assertNotNull(recovered);
- assertEquals((long) i, recovered.getEventId());
- assertEquals("nifi://unit-test", recovered.getTransitUri());
- }
-
- assertNull(reader.nextRecord());
+ final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+ for (int i=0; i < 10; i++) {
+ StandardProvenanceEventRecord recovered = reader.nextRecord();
+ System.out.println(recovered);
+ assertNotNull(recovered);
+ assertEquals((long) i, recovered.getEventId());
+ assertEquals("nifi://unit-test", recovered.getTransitUri());
+ }
+
+ assertNull(reader.nextRecord());
}
-
+
FileUtils.deleteFile(journalFile.getParentFile(), true);
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
index 7459fe8..eb0f736 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
@@ -24,7 +24,7 @@ import java.util.Set;
import org.apache.nifi.flowfile.FlowFile;
public class TestUtil {
- public static FlowFile createFlowFile(final long id, final long fileSize, final Map<String, String> attributes) {
+ public static FlowFile createFlowFile(final long id, final long fileSize, final Map<String, String> attributes) {
final Map<String, String> attrCopy = new HashMap<>(attributes);
return new FlowFile() {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java
index 30326e7..87400a0 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java
@@ -38,7 +38,7 @@ public class TestStandardTocReader {
out.write(0);
out.write(0);
}
-
+
try {
try(final StandardTocReader reader = new StandardTocReader(file)) {
assertFalse(reader.isCompressed());
@@ -46,13 +46,13 @@ public class TestStandardTocReader {
} finally {
file.delete();
}
-
-
+
+
try (final OutputStream out = new FileOutputStream(file)) {
out.write(0);
out.write(1);
}
-
+
try {
try(final StandardTocReader reader = new StandardTocReader(file)) {
assertTrue(reader.isCompressed());
@@ -61,25 +61,25 @@ public class TestStandardTocReader {
file.delete();
}
}
-
-
+
+
@Test
public void testGetBlockIndex() throws IOException {
final File file = new File("target/" + UUID.randomUUID().toString());
try (final OutputStream out = new FileOutputStream(file);
- final DataOutputStream dos = new DataOutputStream(out)) {
+ final DataOutputStream dos = new DataOutputStream(out)) {
out.write(0);
out.write(0);
-
+
for (int i=0; i < 1024; i++) {
dos.writeLong(i * 1024L);
}
}
-
+
try {
try(final StandardTocReader reader = new StandardTocReader(file)) {
assertFalse(reader.isCompressed());
-
+
for (int i=0; i < 1024; i++) {
assertEquals(i * 1024, reader.getBlockOffset(i));
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java
index 70f55a2..aebe0d5 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java
@@ -31,12 +31,12 @@ public class TestStandardTocWriter {
final File tocFile = new File("target/" + UUID.randomUUID().toString() + ".toc");
try {
assertTrue( tocFile.createNewFile() );
-
+
try (final StandardTocWriter writer = new StandardTocWriter(tocFile, false, false)) {
}
} finally {
FileUtils.deleteFile(tocFile, false);
}
}
-
+
}
[4/4] incubator-nifi git commit: Merge branch 'develop' of
http://git-wip-us.apache.org/repos/asf/incubator-nifi into develop
Posted by ma...@apache.org.
Merge branch 'develop' of http://git-wip-us.apache.org/repos/asf/incubator-nifi into develop
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/384b2ac2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/384b2ac2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/384b2ac2
Branch: refs/heads/develop
Commit: 384b2ac2535987a42ae36568d285c829461a1587
Parents: 3cd18b0 1086094
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Apr 27 14:14:48 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Apr 27 14:14:48 2015 -0400
----------------------------------------------------------------------
nifi-parent/pom.xml | 6 +-
.../nifi-hadoop-bundle/nifi-hadoop-nar/pom.xml | 28 +--
.../hadoop/AbstractHadoopProcessor.java | 7 +-
.../hadoop/CreateHadoopSequenceFile.java | 28 +--
.../apache/nifi/processors/hadoop/GetHDFS.java | 60 +++----
.../processors/hadoop/GetHDFSSequenceFile.java | 18 +-
.../nifi/processors/hadoop/KeyValueReader.java | 6 +-
.../nifi/processors/hadoop/ValueReader.java | 5 +-
.../hadoop/util/ByteFilteringOutputStream.java | 24 +--
.../hadoop/util/InputStreamWritable.java | 6 +-
.../hadoop/util/OutputStreamWritable.java | 3 +-
.../hadoop/util/SequenceFileWriter.java | 12 +-
.../nifi/processors/standard/BinFiles.java | 15 +-
.../processors/standard/CompressContent.java | 9 +-
.../nifi/processors/standard/ControlRate.java | 11 +-
.../standard/ConvertCharacterSet.java | 22 ++-
.../processors/standard/DistributeLoad.java | 17 +-
.../processors/standard/EvaluateJsonPath.java | 36 ++--
.../nifi/processors/standard/EvaluateXPath.java | 9 +-
.../processors/standard/EvaluateXQuery.java | 6 +-
.../processors/standard/ExecuteProcess.java | 9 +-
.../standard/ExecuteStreamCommand.java | 21 +--
.../nifi/processors/standard/ExtractText.java | 3 +-
.../processors/standard/GenerateFlowFile.java | 3 +-
.../nifi/processors/standard/GetFile.java | 6 +-
.../nifi/processors/standard/GetJMSTopic.java | 3 +-
.../processors/standard/HandleHttpRequest.java | 17 +-
.../processors/standard/HandleHttpResponse.java | 3 +-
.../nifi/processors/standard/HashAttribute.java | 10 +-
.../nifi/processors/standard/InvokeHTTP.java | 24 ++-
.../nifi/processors/standard/JmsConsumer.java | 6 +-
.../nifi/processors/standard/ListenUDP.java | 178 +++++++++----------
.../nifi/processors/standard/MergeContent.java | 6 +-
.../nifi/processors/standard/PostHTTP.java | 47 +++--
.../nifi/processors/standard/PutEmail.java | 6 +-
.../apache/nifi/processors/standard/PutFTP.java | 9 +-
.../processors/standard/PutFileTransfer.java | 11 +-
.../apache/nifi/processors/standard/PutJMS.java | 3 +-
.../nifi/processors/standard/PutSFTP.java | 6 +-
.../nifi/processors/standard/ReplaceText.java | 28 ++-
.../standard/ReplaceTextWithMapping.java | 13 +-
.../processors/standard/RouteOnAttribute.java | 6 +-
.../nifi/processors/standard/ScanAttribute.java | 5 +-
.../nifi/processors/standard/SplitContent.java | 9 +-
.../nifi/processors/standard/SplitText.java | 18 +-
.../nifi/processors/standard/SplitXml.java | 3 +-
.../nifi/processors/standard/TransformXml.java | 52 +++---
.../nifi/processors/standard/UnpackContent.java | 39 ++--
.../nifi/processors/standard/ValidateXml.java | 51 +++---
.../servlets/ContentAcknowledgmentServlet.java | 5 -
.../standard/servlets/ListenHTTPServlet.java | 5 -
.../nifi/processors/standard/util/Bin.java | 22 +--
.../processors/standard/util/BinManager.java | 2 +-
.../standard/util/DocumentReaderCallback.java | 10 +-
.../processors/standard/util/FTPTransfer.java | 3 +-
.../nifi/processors/standard/util/FTPUtils.java | 2 +-
.../processors/standard/util/FileTransfer.java | 29 ++-
.../processors/standard/util/SFTPTransfer.java | 9 +-
.../standard/util/XmlSplitterSaxParser.java | 11 +-
.../processors/standard/TestDistributeLoad.java | 3 +-
.../standard/TestHandleHttpRequest.java | 4 +-
.../distributed/cache/client/CommsSession.java | 16 +-
.../DistributedMapCacheClientService.java | 7 +-
.../DistributedSetCacheClientService.java | 6 +-
.../cache/client/SSLCommsSession.java | 25 +--
.../cache/client/StandardCommsSession.java | 1 +
.../additionalDetails.html | 60 +++----
.../cache/server/AbstractCacheServer.java | 25 +--
.../distributed/cache/server/CacheRecord.java | 12 +-
.../distributed/cache/server/CacheServer.java | 3 +-
.../cache/server/DistributedCacheServer.java | 3 +-
.../cache/server/DistributedSetCacheServer.java | 13 +-
.../cache/server/EvictionPolicy.java | 24 +--
.../cache/server/SetCacheServer.java | 25 +--
.../server/map/DistributedMapCacheServer.java | 12 +-
.../distributed/cache/server/map/MapCache.java | 4 +
.../cache/server/map/MapCacheRecord.java | 19 +-
.../cache/server/map/MapCacheServer.java | 113 ++++++------
.../cache/server/map/MapPutResult.java | 5 +-
.../cache/server/map/PersistentMapCache.java | 51 +++---
.../cache/server/map/SimpleMapCache.java | 47 ++---
.../cache/server/set/PersistentSetCache.java | 57 +++---
.../distributed/cache/server/set/SetCache.java | 5 +-
.../cache/server/set/SetCacheRecord.java | 15 +-
.../cache/server/set/SetCacheResult.java | 11 +-
.../cache/server/set/SimpleSetCache.java | 41 ++---
.../additionalDetails.html | 62 +++----
.../cache/server/TestServerAndClient.java | 9 +-
.../nifi-http-context-map-api/pom.xml | 34 ++--
.../org/apache/nifi/http/HttpContextMap.java | 45 +++--
.../nifi-http-context-map/pom.xml | 20 +--
.../nifi/http/StandardHttpContextMap.java | 83 ++++-----
.../index.html | 36 ++--
.../nifi/ssl/StandardSSLContextService.java | 3 +-
.../apache/nifi/ssl/SSLContextServiceTest.java | 4 +-
95 files changed, 916 insertions(+), 1008 deletions(-)
----------------------------------------------------------------------
[3/4] incubator-nifi git commit: NIFI-527: Code cleanup
Posted by ma...@apache.org.
NIFI-527: Code cleanup
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/3cd18b0b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/3cd18b0b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/3cd18b0b
Branch: refs/heads/develop
Commit: 3cd18b0babc5133e35a2771bc0d0acaf974c381f
Parents: 666de3d
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Apr 27 14:13:55 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Apr 27 14:13:55 2015 -0400
----------------------------------------------------------------------
.../nifi/provenance/IndexConfiguration.java | 12 +-
.../PersistentProvenanceRepository.java | 612 +++++++-------
.../provenance/RepositoryConfiguration.java | 106 +--
.../nifi/provenance/StandardRecordReader.java | 246 +++---
.../nifi/provenance/StandardRecordWriter.java | 138 ++--
.../provenance/expiration/ExpirationAction.java | 6 +-
.../provenance/lucene/DeleteIndexAction.java | 12 +-
.../nifi/provenance/lucene/DocsReader.java | 79 +-
.../nifi/provenance/lucene/IndexManager.java | 820 +++++++++----------
.../nifi/provenance/lucene/IndexSearch.java | 38 +-
.../nifi/provenance/lucene/IndexingAction.java | 119 +--
.../nifi/provenance/lucene/LineageQuery.java | 6 +-
.../nifi/provenance/lucene/LuceneUtil.java | 38 +-
.../provenance/rollover/CompressionAction.java | 59 --
.../provenance/rollover/RolloverAction.java | 35 -
.../provenance/serialization/RecordReader.java | 57 +-
.../provenance/serialization/RecordReaders.java | 136 +--
.../provenance/serialization/RecordWriter.java | 23 +-
.../provenance/serialization/RecordWriters.java | 8 +-
.../nifi/provenance/toc/StandardTocReader.java | 44 +-
.../nifi/provenance/toc/StandardTocWriter.java | 35 +-
.../apache/nifi/provenance/toc/TocReader.java | 20 +-
.../org/apache/nifi/provenance/toc/TocUtil.java | 27 +-
.../apache/nifi/provenance/toc/TocWriter.java | 16 +-
.../TestPersistentProvenanceRepository.java | 118 +--
.../TestStandardRecordReaderWriter.java | 162 ++--
.../org/apache/nifi/provenance/TestUtil.java | 2 +-
.../provenance/toc/TestStandardTocReader.java | 20 +-
.../provenance/toc/TestStandardTocWriter.java | 4 +-
29 files changed, 1391 insertions(+), 1607 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
index a5474d5..3beab65 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
@@ -92,7 +92,7 @@ public class IndexConfiguration {
}
return firstRecord.getEventTime();
} catch (final FileNotFoundException | EOFException fnf) {
- return null; // file no longer exists or there's no record in this file
+ return null; // file no longer exists or there's no record in this file
} catch (final IOException ioe) {
logger.warn("Failed to read first entry in file {} due to {}", provenanceLogFile, ioe.toString());
logger.warn("", ioe);
@@ -201,7 +201,8 @@ public class IndexConfiguration {
* desired
* @param endTime the end time of the query for which the indices are
* desired
- * @return
+ * @return the index directories that are applicable only for the given time
+ * span (times inclusive).
*/
public List<File> getIndexDirectories(final Long startTime, final Long endTime) {
if (startTime == null && endTime == null) {
@@ -252,7 +253,8 @@ public class IndexConfiguration {
*
* @param provenanceLogFile the provenance log file for which the index
* directories are desired
- * @return
+ * @return the index directories that are applicable only for the given
+ * event log
*/
public List<File> getIndexDirectories(final File provenanceLogFile) {
final List<File> dirs = new ArrayList<>();
@@ -334,9 +336,7 @@ public class IndexConfiguration {
}
/**
- * Returns the amount of disk space in bytes used by all of the indices
- *
- * @return
+ * @return the amount of disk space in bytes used by all of the indices
*/
public long getIndexSize() {
lock.lock();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 48cc164..fe89a5e 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -139,7 +139,6 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
private final List<ExpirationAction> expirationActions = new ArrayList<>();
- private final IndexingAction indexingAction;
private final ConcurrentMap<String, AsyncQuerySubmission> querySubmissionMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, AsyncLineageSubmission> lineageSubmissionMap = new ConcurrentHashMap<>();
@@ -151,7 +150,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final AtomicBoolean repoDirty = new AtomicBoolean(false);
- // we keep the last 1000 records on hand so that when the UI is opened and it asks for the last 1000 records we don't need to
+ // we keep the last 1000 records on hand so that when the UI is opened and it asks for the last 1000 records we don't need to
// read them. Since this is a very cheap operation to keep them, it's worth the tiny expense for the improved user experience.
private final RingBuffer<ProvenanceEventRecord> latestRecords = new RingBuffer<>(1000);
private EventReporter eventReporter;
@@ -184,13 +183,6 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
this.indexManager = new IndexManager();
this.alwaysSync = configuration.isAlwaysSync();
this.rolloverCheckMillis = rolloverCheckMillis;
-
- final List<SearchableField> fields = configuration.getSearchableFields();
- if (fields != null && !fields.isEmpty()) {
- indexingAction = new IndexingAction(this, indexConfig);
- } else {
- indexingAction = null;
- }
scheduledExecService = Executors.newScheduledThreadPool(3, new NamedThreadFactory("Provenance Maintenance Thread"));
queryExecService = Executors.newFixedThreadPool(configuration.getQueryThreadPoolSize(), new NamedThreadFactory("Provenance Query Thread"));
@@ -205,69 +197,69 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
@Override
public void initialize(final EventReporter eventReporter) throws IOException {
- writeLock.lock();
- try {
- if (initialized.getAndSet(true)) {
- return;
- }
-
- this.eventReporter = eventReporter;
-
- recover();
-
- if (configuration.isAllowRollover()) {
- writers = createWriters(configuration, idGenerator.get());
- }
-
- if (configuration.isAllowRollover()) {
- scheduledExecService.scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- // Check if we need to roll over
- if (needToRollover()) {
- // it appears that we do need to roll over. Obtain write lock so that we can do so, and then
- // confirm that we still need to.
- writeLock.lock();
- try {
- logger.debug("Obtained write lock to perform periodic rollover");
-
- if (needToRollover()) {
- try {
- rollover(false);
- } catch (final Exception e) {
- logger.error("Failed to roll over Provenance Event Log due to {}", e.toString());
- logger.error("", e);
- }
- }
- } finally {
- writeLock.unlock();
- }
- }
- }
- }, rolloverCheckMillis, rolloverCheckMillis, TimeUnit.MILLISECONDS);
-
- scheduledExecService.scheduleWithFixedDelay(new RemoveExpiredQueryResults(), 30L, 3L, TimeUnit.SECONDS);
- scheduledExecService.scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- try {
- purgeOldEvents();
- } catch (final Exception e) {
- logger.error("Failed to purge old events from Provenance Repo due to {}", e.toString());
- if (logger.isDebugEnabled()) {
- logger.error("", e);
- }
- eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to purge old events from Provenance Repo due to " + e.toString());
- }
- }
- }, 1L, 1L, TimeUnit.MINUTES);
-
- expirationActions.add(new DeleteIndexAction(this, indexConfig, indexManager));
- expirationActions.add(new FileRemovalAction());
- }
- } finally {
- writeLock.unlock();
- }
+ writeLock.lock();
+ try {
+ if (initialized.getAndSet(true)) {
+ return;
+ }
+
+ this.eventReporter = eventReporter;
+
+ recover();
+
+ if (configuration.isAllowRollover()) {
+ writers = createWriters(configuration, idGenerator.get());
+ }
+
+ if (configuration.isAllowRollover()) {
+ scheduledExecService.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ // Check if we need to roll over
+ if (needToRollover()) {
+ // it appears that we do need to roll over. Obtain write lock so that we can do so, and then
+ // confirm that we still need to.
+ writeLock.lock();
+ try {
+ logger.debug("Obtained write lock to perform periodic rollover");
+
+ if (needToRollover()) {
+ try {
+ rollover(false);
+ } catch (final Exception e) {
+ logger.error("Failed to roll over Provenance Event Log due to {}", e.toString());
+ logger.error("", e);
+ }
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+ }
+ }, rolloverCheckMillis, rolloverCheckMillis, TimeUnit.MILLISECONDS);
+
+ scheduledExecService.scheduleWithFixedDelay(new RemoveExpiredQueryResults(), 30L, 3L, TimeUnit.SECONDS);
+ scheduledExecService.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ purgeOldEvents();
+ } catch (final Exception e) {
+ logger.error("Failed to purge old events from Provenance Repo due to {}", e.toString());
+ if (logger.isDebugEnabled()) {
+ logger.error("", e);
+ }
+ eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to purge old events from Provenance Repo due to " + e.toString());
+ }
+ }
+ }, 1L, 1L, TimeUnit.MINUTES);
+
+ expirationActions.add(new DeleteIndexAction(this, indexConfig, indexManager));
+ expirationActions.add(new FileRemovalAction());
+ }
+ } finally {
+ writeLock.unlock();
+ }
}
private static RepositoryConfiguration createRepositoryConfiguration() throws IOException {
@@ -489,28 +481,26 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
maxIdFile = file;
}
- if (firstId > maxIndexedId && indexingAction != null && indexingAction.hasBeenPerformed(file)) {
+ if (firstId > maxIndexedId) {
maxIndexedId = firstId - 1;
}
- if (firstId < minIndexedId && indexingAction != null && indexingAction.hasBeenPerformed(file)) {
+ if (firstId < minIndexedId) {
minIndexedId = firstId;
}
}
if (maxIdFile != null) {
- final boolean lastFileIndexed = indexingAction == null ? false : indexingAction.hasBeenPerformed(maxIdFile);
-
// Determine the max ID in the last file.
try (final RecordReader reader = RecordReaders.newRecordReader(maxIdFile, getAllLogFiles())) {
- final long eventId = reader.getMaxEventId();
+ final long eventId = reader.getMaxEventId();
if (eventId > maxId) {
maxId = eventId;
}
// If the ID is greater than the max indexed id and this file was indexed, then
// update the max indexed id
- if (eventId > maxIndexedId && lastFileIndexed) {
+ if (eventId > maxIndexedId) {
maxIndexedId = eventId;
}
} catch (final IOException ioe) {
@@ -567,7 +557,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
// Read the records in the last file to find its max id
if (greatestMinIdFile != null) {
try (final RecordReader recordReader = RecordReaders.newRecordReader(greatestMinIdFile, Collections.<Path>emptyList())) {
- maxId = recordReader.getMaxEventId();
+ maxId = recordReader.getMaxEventId();
}
}
@@ -604,11 +594,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
queryExecService.shutdownNow();
indexManager.close();
-
+
if ( writers != null ) {
- for (final RecordWriter writer : writers) {
- writer.close();
- }
+ for (final RecordWriter writer : writers) {
+ writer.close();
+ }
}
} finally {
writeLock.unlock();
@@ -624,7 +614,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
readLock.lock();
try {
if (repoDirty.get()) {
- logger.debug("Cannot persist provenance record because there was an IOException last time a record persistence was attempted. Will not attempt to persist more records until the repo has been rolled over.");
+ logger.debug("Cannot persist provenance record because there was an IOException last time a record persistence was attempted. "
+ + "Will not attempt to persist more records until the repo has been rolled over.");
return;
}
@@ -670,7 +661,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
} catch (final IOException ioe) {
logger.error("Failed to persist Provenance Event due to {}. Will not attempt to write to the Provenance Repository again until the repository has rolled over.", ioe.toString());
logger.error("", ioe);
- eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to persist Provenance Event due to " + ioe.toString() + ". Will not attempt to write to the Provenance Repository again until the repository has rolled over");
+ eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to persist Provenance Event due to " + ioe.toString() +
+ ". Will not attempt to write to the Provenance Repository again until the repository has rolled over");
// Switch from readLock to writeLock so that we can perform rollover
readLock.unlock();
@@ -735,9 +727,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
/**
* Returns the size, in bytes, of the Repository storage
*
- * @param logFiles
- * @param timeCutoff
- * @return
+ * @param logFiles the log files to consider
+ * @param timeCutoff if a log file's last modified date is before timeCutoff, it will be skipped
+ * @return the size of all log files given whose last mod date comes after (or equal to) timeCutoff
*/
public long getSize(final List<File> logFiles, final long timeCutoff) {
long bytesUsed = 0L;
@@ -760,7 +752,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
/**
* Purges old events from the repository
*
- * @throws IOException
+ * @throws IOException if unable to purge old events due to an I/O problem
*/
void purgeOldEvents() throws IOException {
while (!recoveryFinished.get()) {
@@ -858,12 +850,16 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
removed.add(baseName);
} catch (final FileNotFoundException fnf) {
- logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} because the file no longer exists; will not perform additional Expiration Actions on this file", currentAction, file);
+ logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} because the file no longer exists; will not "
+ + "perform additional Expiration Actions on this file", currentAction, file);
removed.add(baseName);
} catch (final Throwable t) {
- logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} due to {}; will not perform additional Expiration Actions on this file at this time", currentAction, file, t.toString());
+ logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} due to {}; will not perform additional "
+ + "Expiration Actions on this file at this time", currentAction, file, t.toString());
logger.warn("", t);
- eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to perform Expiration Action " + currentAction + " on Provenance Event file " + file + " due to " + t.toString() + "; will not perform additional Expiration Actions on this file at this time");
+ eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to perform Expiration Action " + currentAction +
+ " on Provenance Event file " + file + " due to " + t.toString() + "; will not perform additional Expiration Actions " +
+ "on this file at this time");
}
}
@@ -906,24 +902,24 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
// made protected for testing purposes
protected int getJournalCount() {
- // determine how many 'journals' we have in the journals directories
+ // determine how many 'journals' we have in the journals directories
int journalFileCount = 0;
for ( final File storageDir : configuration.getStorageDirectories() ) {
- final File journalsDir = new File(storageDir, "journals");
- final File[] journalFiles = journalsDir.listFiles();
- if ( journalFiles != null ) {
- journalFileCount += journalFiles.length;
- }
+ final File journalsDir = new File(storageDir, "journals");
+ final File[] journalFiles = journalsDir.listFiles();
+ if ( journalFiles != null ) {
+ journalFileCount += journalFiles.length;
+ }
}
-
+
return journalFileCount;
}
-
+
/**
* MUST be called with the write lock held
*
- * @param force
- * @throws IOException
+ * @param force if true, will force a rollover regardless of whether or not data has been written
+ * @throws IOException if unable to complete rollover
*/
private void rollover(final boolean force) throws IOException {
if (!configuration.isAllowRollover()) {
@@ -938,44 +934,44 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
final File writerFile = writer.getFile();
journalsToMerge.add(writerFile);
try {
- writer.close();
+ writer.close();
} catch (final IOException ioe) {
- logger.warn("Failed to close {} due to {}", writer, ioe.toString());
- if ( logger.isDebugEnabled() ) {
- logger.warn("", ioe);
- }
+ logger.warn("Failed to close {} due to {}", writer, ioe.toString());
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
}
}
if ( logger.isDebugEnabled() ) {
- logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), "."));
+ logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), "."));
}
int journalFileCount = getJournalCount();
final int journalCountThreshold = configuration.getJournalCount() * 5;
if ( journalFileCount > journalCountThreshold ) {
- logger.warn("The rate of the dataflow is exceeding the provenance recording rate. "
- + "Slowing down flow to accomodate. Currently, there are {} journal files and "
- + "threshold for blocking is {}", journalFileCount, journalCountThreshold);
- eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The rate of the dataflow is "
- + "exceeding the provenance recording rate. Slowing down flow to accomodate");
-
- while (journalFileCount > journalCountThreshold) {
- try {
- Thread.sleep(1000L);
- } catch (final InterruptedException ie) {
- }
-
- logger.debug("Provenance Repository is still behind. Keeping flow slowed down "
- + "to accomodate. Currently, there are {} journal files and "
- + "threshold for blocking is {}", journalFileCount, journalCountThreshold);
-
- journalFileCount = getJournalCount();
- }
-
- logger.info("Provenance Repository has no caught up with rolling over journal files. Current number of "
- + "journal files to be rolled over is {}", journalFileCount);
- }
-
+ logger.warn("The rate of the dataflow is exceeding the provenance recording rate. "
+ + "Slowing down flow to accomodate. Currently, there are {} journal files and "
+ + "threshold for blocking is {}", journalFileCount, journalCountThreshold);
+ eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The rate of the dataflow is "
+ + "exceeding the provenance recording rate. Slowing down flow to accomodate");
+
+ while (journalFileCount > journalCountThreshold) {
+ try {
+ Thread.sleep(1000L);
+ } catch (final InterruptedException ie) {
+ }
+
+ logger.debug("Provenance Repository is still behind. Keeping flow slowed down "
+ + "to accomodate. Currently, there are {} journal files and "
+ + "threshold for blocking is {}", journalFileCount, journalCountThreshold);
+
+ journalFileCount = getJournalCount();
+ }
+
+ logger.info("Provenance Repository has no caught up with rolling over journal files. Current number of "
+ + "journal files to be rolled over is {}", journalFileCount);
+ }
+
writers = createWriters(configuration, idGenerator.get());
streamStartTime.set(System.currentTimeMillis());
recordsWrittenSinceRollover.getAndSet(0);
@@ -989,24 +985,24 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
final Runnable rolloverRunnable = new Runnable() {
@Override
public void run() {
- try {
- final File fileRolledOver;
-
- try {
- fileRolledOver = mergeJournals(journalsToMerge, storageDir, getMergeFile(journalsToMerge, storageDir), eventReporter, latestRecords);
- repoDirty.set(false);
- } catch (final IOException ioe) {
- repoDirty.set(true);
- logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
- logger.error("", ioe);
- return;
- }
-
- if (fileRolledOver == null) {
- return;
- }
- File file = fileRolledOver;
-
+ try {
+ final File fileRolledOver;
+
+ try {
+ fileRolledOver = mergeJournals(journalsToMerge, storageDir, getMergeFile(journalsToMerge, storageDir), eventReporter, latestRecords);
+ repoDirty.set(false);
+ } catch (final IOException ioe) {
+ repoDirty.set(true);
+ logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
+ logger.error("", ioe);
+ return;
+ }
+
+ if (fileRolledOver == null) {
+ return;
+ }
+ File file = fileRolledOver;
+
// update our map of id to Path
// need lock to update the map, even though it's an AtomicReference, AtomicReference allows those doing a
// get() to obtain the most up-to-date version but we use a writeLock to prevent multiple threads modifying
@@ -1021,24 +1017,24 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
} finally {
writeLock.unlock();
}
-
- logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten);
- rolloverCompletions.getAndIncrement();
-
- // We have finished successfully. Cancel the future so that we don't run anymore
- Future<?> future;
- while ((future = futureReference.get()) == null) {
- try {
- Thread.sleep(10L);
- } catch (final InterruptedException ie) {
- }
- }
-
- future.cancel(false);
- } catch (final Throwable t) {
- logger.error("Failed to rollover Provenance repository due to {}", t.toString());
- logger.error("", t);
- }
+
+ logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten);
+ rolloverCompletions.getAndIncrement();
+
+ // We have finished successfully. Cancel the future so that we don't run anymore
+ Future<?> future;
+ while ((future = futureReference.get()) == null) {
+ try {
+ Thread.sleep(10L);
+ } catch (final InterruptedException ie) {
+ }
+ }
+
+ future.cancel(false);
+ } catch (final Throwable t) {
+ logger.error("Failed to rollover Provenance repository due to {}", t.toString());
+ logger.error("", t);
+ }
}
};
@@ -1074,10 +1070,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
}
for (final File journalFile : journalFiles) {
- if ( journalFile.isDirectory() ) {
- continue;
- }
-
+ if ( journalFile.isDirectory() ) {
+ continue;
+ }
+
final String basename = LuceneUtil.substringBefore(journalFile.getName(), ".");
List<File> files = journalMap.get(basename);
if (files == null) {
@@ -1120,83 +1116,84 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
return mergedFile;
}
- File mergeJournals(final List<File> journalFiles, final File storageDir, final File mergedFile, final EventReporter eventReporter, final RingBuffer<ProvenanceEventRecord> ringBuffer) throws IOException {
- logger.debug("Merging {} to {}", journalFiles, mergedFile);
- if ( this.closed ) {
- logger.info("Provenance Repository has been closed; will not merge journal files to {}", mergedFile);
- return null;
- }
-
+ File mergeJournals(final List<File> journalFiles, final File storageDir, final File mergedFile, final EventReporter eventReporter,
+ final RingBuffer<ProvenanceEventRecord> ringBuffer) throws IOException {
+ logger.debug("Merging {} to {}", journalFiles, mergedFile);
+ if ( this.closed ) {
+ logger.info("Provenance Repository has been closed; will not merge journal files to {}", mergedFile);
+ return null;
+ }
+
if (journalFiles.isEmpty()) {
return null;
}
Collections.sort(journalFiles, new Comparator<File>() {
- @Override
- public int compare(final File o1, final File o2) {
- final String suffix1 = LuceneUtil.substringAfterLast(o1.getName(), ".");
- final String suffix2 = LuceneUtil.substringAfterLast(o2.getName(), ".");
-
- try {
- final int journalIndex1 = Integer.parseInt(suffix1);
- final int journalIndex2 = Integer.parseInt(suffix2);
- return Integer.compare(journalIndex1, journalIndex2);
- } catch (final NumberFormatException nfe) {
- return o1.getName().compareTo(o2.getName());
- }
- }
+ @Override
+ public int compare(final File o1, final File o2) {
+ final String suffix1 = LuceneUtil.substringAfterLast(o1.getName(), ".");
+ final String suffix2 = LuceneUtil.substringAfterLast(o2.getName(), ".");
+
+ try {
+ final int journalIndex1 = Integer.parseInt(suffix1);
+ final int journalIndex2 = Integer.parseInt(suffix2);
+ return Integer.compare(journalIndex1, journalIndex2);
+ } catch (final NumberFormatException nfe) {
+ return o1.getName().compareTo(o2.getName());
+ }
+ }
});
-
+
final String firstJournalFile = journalFiles.get(0).getName();
final String firstFileSuffix = LuceneUtil.substringAfterLast(firstJournalFile, ".");
final boolean allPartialFiles = firstFileSuffix.equals("0");
-
+
// check if we have all of the "partial" files for the journal.
if (allPartialFiles) {
- if ( mergedFile.exists() ) {
- // we have all "partial" files and there is already a merged file. Delete the data from the index
- // because the merge file may not be fully merged. We will re-merge.
- logger.warn("Merged Journal File {} already exists; however, all partial journal files also exist "
- + "so assuming that the merge did not finish. Repeating procedure in order to ensure consistency.");
-
- final DeleteIndexAction deleteAction = new DeleteIndexAction(this, indexConfig, indexManager);
- try {
- deleteAction.execute(mergedFile);
- } catch (final Exception e) {
- logger.warn("Failed to delete records from Journal File {} from the index; this could potentially result in duplicates. Failure was due to {}", mergedFile, e.toString());
- if ( logger.isDebugEnabled() ) {
- logger.warn("", e);
- }
- }
-
- // Since we only store the file's basename, block offset, and event ID, and because the newly created file could end up on
- // a different Storage Directory than the original, we need to ensure that we delete both the partially merged
- // file and the TOC file. Otherwise, we could get the wrong copy and have issues retrieving events.
- if ( !mergedFile.delete() ) {
- logger.error("Failed to delete partially written Provenance Journal File {}. This may result in events from this journal "
- + "file not being able to be displayed. This file should be deleted manually.", mergedFile);
- }
-
- final File tocFile = TocUtil.getTocFile(mergedFile);
- if ( tocFile.exists() && !tocFile.delete() ) {
- logger.error("Failed to delete .toc file {}; this may result in not being able to read the Provenance Events from the {} Journal File. "
- + "This can be corrected by manually deleting the {} file", tocFile, mergedFile, tocFile);
- }
- }
+ if ( mergedFile.exists() ) {
+ // we have all "partial" files and there is already a merged file. Delete the data from the index
+ // because the merge file may not be fully merged. We will re-merge.
+ logger.warn("Merged Journal File {} already exists; however, all partial journal files also exist "
+ + "so assuming that the merge did not finish. Repeating procedure in order to ensure consistency.");
+
+ final DeleteIndexAction deleteAction = new DeleteIndexAction(this, indexConfig, indexManager);
+ try {
+ deleteAction.execute(mergedFile);
+ } catch (final Exception e) {
+ logger.warn("Failed to delete records from Journal File {} from the index; this could potentially result in duplicates. Failure was due to {}", mergedFile, e.toString());
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", e);
+ }
+ }
+
+ // Since we only store the file's basename, block offset, and event ID, and because the newly created file could end up on
+ // a different Storage Directory than the original, we need to ensure that we delete both the partially merged
+ // file and the TOC file. Otherwise, we could get the wrong copy and have issues retrieving events.
+ if ( !mergedFile.delete() ) {
+ logger.error("Failed to delete partially written Provenance Journal File {}. This may result in events from this journal "
+ + "file not being able to be displayed. This file should be deleted manually.", mergedFile);
+ }
+
+ final File tocFile = TocUtil.getTocFile(mergedFile);
+ if ( tocFile.exists() && !tocFile.delete() ) {
+ logger.error("Failed to delete .toc file {}; this may result in not being able to read the Provenance Events from the {} Journal File. "
+ + "This can be corrected by manually deleting the {} file", tocFile, mergedFile, tocFile);
+ }
+ }
} else {
- logger.warn("Cannot merge journal files {} because expected first file to end with extension '.0' "
- + "but it did not; assuming that the files were already merged but only some finished deletion "
- + "before restart. Deleting remaining partial journal files.", journalFiles);
-
- for ( final File file : journalFiles ) {
- if ( !file.delete() && file.exists() ) {
- logger.warn("Failed to delete unneeded journal file {}; this file should be cleaned up manually", file);
- }
- }
-
- return null;
- }
-
+ logger.warn("Cannot merge journal files {} because expected first file to end with extension '.0' "
+ + "but it did not; assuming that the files were already merged but only some finished deletion "
+ + "before restart. Deleting remaining partial journal files.", journalFiles);
+
+ for ( final File file : journalFiles ) {
+ if ( !file.delete() && file.exists() ) {
+ logger.warn("Failed to delete unneeded journal file {}; this file should be cleaned up manually", file);
+ }
+ }
+
+ return null;
+ }
+
final long startNanos = System.nanoTime();
// Map each journal to a RecordReader
@@ -1241,12 +1238,14 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
record = reader.nextRecord();
} catch (final EOFException eof) {
} catch (final Exception e) {
- logger.warn("Failed to generate Provenance Event Record from Journal due to " + e + "; it's possible that the record wasn't completely written to the file. This record will be skipped.");
+ logger.warn("Failed to generate Provenance Event Record from Journal due to " + e + "; it's possible that the record wasn't "
+ + "completely written to the file. This record will be skipped.");
if (logger.isDebugEnabled()) {
logger.warn("", e);
}
- eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to read Provenance Event Record from Journal due to " + e + "; it's possible that hte record wasn't completely written to the file. This record will be skipped.");
+ eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to read Provenance Event Record from Journal due to " + e +
+ "; it's possible that hte record wasn't completely written to the file. This record will be skipped.");
}
if (record == null) {
@@ -1261,47 +1260,47 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
try (final RecordWriter writer = RecordWriters.newRecordWriter(writerFile, configuration.isCompressOnRollover(), true)) {
writer.writeHeader();
- final IndexingAction indexingAction = new IndexingAction(this, indexConfig);
-
+ final IndexingAction indexingAction = new IndexingAction(this);
+
final File indexingDirectory = indexConfig.getWritableIndexDirectory(writerFile);
final IndexWriter indexWriter = indexManager.borrowIndexWriter(indexingDirectory);
try {
- long maxId = 0L;
-
- while (!recordToReaderMap.isEmpty()) {
- final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
- final StandardProvenanceEventRecord record = entry.getKey();
- final RecordReader reader = entry.getValue();
-
- writer.writeRecord(record, record.getEventId());
- final int blockIndex = writer.getTocWriter().getCurrentBlockIndex();
-
- indexingAction.index(record, indexWriter, blockIndex);
- maxId = record.getEventId();
-
- ringBuffer.add(record);
- records++;
-
- // Remove this entry from the map
- recordToReaderMap.remove(record);
-
- // Get the next entry from this reader and add it to the map
- StandardProvenanceEventRecord nextRecord = null;
-
- try {
- nextRecord = reader.nextRecord();
- } catch (final EOFException eof) {
- }
-
- if (nextRecord != null) {
- recordToReaderMap.put(nextRecord, reader);
- }
- }
-
- indexWriter.commit();
- indexConfig.setMaxIdIndexed(maxId);
+ long maxId = 0L;
+
+ while (!recordToReaderMap.isEmpty()) {
+ final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
+ final StandardProvenanceEventRecord record = entry.getKey();
+ final RecordReader reader = entry.getValue();
+
+ writer.writeRecord(record, record.getEventId());
+ final int blockIndex = writer.getTocWriter().getCurrentBlockIndex();
+
+ indexingAction.index(record, indexWriter, blockIndex);
+ maxId = record.getEventId();
+
+ ringBuffer.add(record);
+ records++;
+
+ // Remove this entry from the map
+ recordToReaderMap.remove(record);
+
+ // Get the next entry from this reader and add it to the map
+ StandardProvenanceEventRecord nextRecord = null;
+
+ try {
+ nextRecord = reader.nextRecord();
+ } catch (final EOFException eof) {
+ }
+
+ if (nextRecord != null) {
+ recordToReaderMap.put(nextRecord, reader);
+ }
+ }
+
+ indexWriter.commit();
+ indexConfig.setMaxIdIndexed(maxId);
} finally {
- indexManager.returnIndexWriter(indexingDirectory, indexWriter);
+ indexManager.returnIndexWriter(indexingDirectory, indexWriter);
}
}
} finally {
@@ -1319,7 +1318,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
logger.warn("Failed to remove temporary journal file {}; this file should be cleaned up manually", journalFile.getAbsolutePath());
eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal file " + journalFile.getAbsolutePath() + "; this file should be cleaned up manually");
}
-
+
final File tocFile = TocUtil.getTocFile(journalFile);
if (!tocFile.delete() && tocFile.exists()) {
logger.warn("Failed to remove temporary journal TOC file {}; this file should be cleaned up manually", tocFile.getAbsolutePath());
@@ -1374,7 +1373,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
public QuerySubmission submitQuery(final Query query) {
final int numQueries = querySubmissionMap.size();
if (numQueries > MAX_UNDELETED_QUERY_RESULTS) {
- throw new IllegalStateException("Cannot process query because there are currently " + numQueries + " queries whose results have not been deleted due to poorly behaving clients not issuing DELETE requests. Please try again later.");
+ throw new IllegalStateException("Cannot process query because there are currently " + numQueries + " queries whose results have not "
+ + "been deleted due to poorly behaving clients not issuing DELETE requests. Please try again later.");
}
if (query.getEndDate() != null && query.getStartDate() != null && query.getStartDate().getTime() > query.getEndDate().getTime()) {
@@ -1416,7 +1416,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
final AtomicInteger retrievalCount = new AtomicInteger(0);
final List<File> indexDirectories = indexConfig.getIndexDirectories(
query.getStartDate() == null ? null : query.getStartDate().getTime(),
- query.getEndDate() == null ? null : query.getEndDate().getTime());
+ query.getEndDate() == null ? null : query.getEndDate().getTime());
final AsyncQuerySubmission result = new AsyncQuerySubmission(query, indexDirectories.size());
querySubmissionMap.put(query.getIdentifier(), result);
@@ -1432,11 +1432,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
}
/**
- * REMOVE-ME: This is for testing only and can be removed.
+ * This is for testing only and not actually used other than in debugging
*
- * @param luceneQuery
- * @return
- * @throws IOException
+ * @param luceneQuery the lucene query to execute
+ * @return an Iterator of ProvenanceEventRecord that match the query
+ * @throws IOException if unable to perform the query
*/
public Iterator<ProvenanceEventRecord> queryLucene(final org.apache.lucene.search.Query luceneQuery) throws IOException {
final List<File> indexFiles = indexConfig.getIndexDirectories();
@@ -1601,7 +1601,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
return computeLineage(Collections.<String>singleton(flowFileUuid), LineageComputationType.FLOWFILE_LINEAGE, null, 0L, Long.MAX_VALUE);
}
- private Lineage computeLineage(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId, final Long startTimestamp, final Long endTimestamp) throws IOException {
+ private Lineage computeLineage(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId, final Long startTimestamp,
+ final Long endTimestamp) throws IOException {
final AsyncLineageSubmission submission = submitLineageComputation(flowFileUuids, computationType, eventId, startTimestamp, endTimestamp);
final StandardLineageResult result = submission.getResult();
while (!result.isFinished()) {
@@ -1623,7 +1624,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
return submitLineageComputation(Collections.singleton(flowFileUuid), LineageComputationType.FLOWFILE_LINEAGE, null, 0L, Long.MAX_VALUE);
}
- private AsyncLineageSubmission submitLineageComputation(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId, final long startTimestamp, final long endTimestamp) {
+ private AsyncLineageSubmission submitLineageComputation(final Collection<String> flowFileUuids, final LineageComputationType computationType,
+ final Long eventId, final long startTimestamp, final long endTimestamp) {
final List<File> indexDirs = indexConfig.getIndexDirectories(startTimestamp, endTimestamp);
final AsyncLineageSubmission result = new AsyncLineageSubmission(computationType, eventId, flowFileUuids, indexDirs.size());
lineageSubmissionMap.put(result.getLineageIdentifier(), result);
@@ -1647,16 +1649,16 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
}
switch (event.getEventType()) {
- case CLONE:
- case FORK:
- case JOIN:
- case REPLAY:
- return submitLineageComputation(event.getChildUuids(), LineageComputationType.EXPAND_CHILDREN, eventId, event.getEventTime(), Long.MAX_VALUE);
- default:
- final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1);
- lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
- submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded");
- return submission;
+ case CLONE:
+ case FORK:
+ case JOIN:
+ case REPLAY:
+ return submitLineageComputation(event.getChildUuids(), LineageComputationType.EXPAND_CHILDREN, eventId, event.getEventTime(), Long.MAX_VALUE);
+ default:
+ final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1);
+ lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
+ submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded");
+ return submission;
}
} catch (final IOException ioe) {
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1);
@@ -1684,17 +1686,17 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
}
switch (event.getEventType()) {
- case JOIN:
- case FORK:
- case CLONE:
- case REPLAY:
- return submitLineageComputation(event.getParentUuids(), LineageComputationType.EXPAND_PARENTS, eventId, 0L, event.getEventTime());
- default: {
- final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1);
- lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
- submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded");
- return submission;
- }
+ case JOIN:
+ case FORK:
+ case CLONE:
+ case REPLAY:
+ return submitLineageComputation(event.getParentUuids(), LineageComputationType.EXPAND_PARENTS, eventId, 0L, event.getEventTime());
+ default: {
+ final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1);
+ lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
+ submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded");
+ return submission;
+ }
}
} catch (final IOException ioe) {
final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
index 3951591..d0d147c 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
@@ -34,7 +34,7 @@ public class RepositoryConfiguration {
private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB
private int journalCount = 16;
private int compressionBlockBytes = 1024 * 1024;
-
+
private List<SearchableField> searchableFields = new ArrayList<>();
private List<SearchableField> searchableAttributes = new ArrayList<>();
private boolean compress = true;
@@ -50,19 +50,19 @@ public class RepositoryConfiguration {
return allowRollover;
}
-
+
public int getCompressionBlockBytes() {
- return compressionBlockBytes;
- }
+ return compressionBlockBytes;
+ }
- public void setCompressionBlockBytes(int compressionBlockBytes) {
- this.compressionBlockBytes = compressionBlockBytes;
- }
+ public void setCompressionBlockBytes(int compressionBlockBytes) {
+ this.compressionBlockBytes = compressionBlockBytes;
+ }
- /**
+ /**
* Specifies where the repository will store data
*
- * @return
+ * @return the directories where provenance files will be stored
*/
public List<File> getStorageDirectories() {
return Collections.unmodifiableList(storageDirectories);
@@ -71,18 +71,15 @@ public class RepositoryConfiguration {
/**
* Specifies where the repository should store data
*
- * @param storageDirectory
+ * @param storageDirectory the directory to store provenance files
*/
public void addStorageDirectory(final File storageDirectory) {
this.storageDirectories.add(storageDirectory);
}
/**
- * Returns the minimum amount of time that a given record will stay in the
- * repository
- *
- * @param timeUnit
- * @return
+ * @param timeUnit the desired time unit
+ * @return the max amount of time that a given record will stay in the repository
*/
public long getMaxRecordLife(final TimeUnit timeUnit) {
return timeUnit.convert(recordLifeMillis, TimeUnit.MILLISECONDS);
@@ -91,8 +88,8 @@ public class RepositoryConfiguration {
/**
* Specifies how long a record should stay in the repository
*
- * @param maxRecordLife
- * @param timeUnit
+ * @param maxRecordLife the max amount of time to keep a record in the repo
+ * @param timeUnit the period of time used by maxRecordLife
*/
public void setMaxRecordLife(final long maxRecordLife, final TimeUnit timeUnit) {
this.recordLifeMillis = TimeUnit.MILLISECONDS.convert(maxRecordLife, timeUnit);
@@ -101,7 +98,7 @@ public class RepositoryConfiguration {
/**
* Returns the maximum amount of data to store in the repository (in bytes)
*
- * @return
+ * @return the maximum amount of disk space to use for the prov repo
*/
public long getMaxStorageCapacity() {
return storageCapacity;
@@ -109,107 +106,91 @@ public class RepositoryConfiguration {
/**
* Sets the maximum amount of data to store in the repository (in bytes)
- * @param maxStorageCapacity
+ *
+ * @param maxStorageCapacity the maximum amount of disk space to use for the prov repo
*/
public void setMaxStorageCapacity(final long maxStorageCapacity) {
this.storageCapacity = maxStorageCapacity;
}
/**
- * Returns the maximum amount of time to write to a single event file
- *
- * @param timeUnit
- * @return
+ * @param timeUnit the desired time unit for the returned value
+ * @return the maximum amount of time that the repo will write to a single event file
*/
public long getMaxEventFileLife(final TimeUnit timeUnit) {
return timeUnit.convert(eventFileMillis, TimeUnit.MILLISECONDS);
}
/**
- * Sets the maximum amount of time to write to a single event file
- *
- * @param maxEventFileTime
- * @param timeUnit
+ * @param maxEventFileTime the max amount of time to write to a single event file
+ * @param timeUnit the units for the value supplied by maxEventFileTime
*/
public void setMaxEventFileLife(final long maxEventFileTime, final TimeUnit timeUnit) {
this.eventFileMillis = TimeUnit.MILLISECONDS.convert(maxEventFileTime, timeUnit);
}
/**
- * Returns the maximum number of bytes (pre-compression) that will be
+ * @return the maximum number of bytes (pre-compression) that will be
* written to a single event file before the file is rolled over
- *
- * @return
*/
public long getMaxEventFileCapacity() {
return eventFileBytes;
}
/**
- * Sets the maximum number of bytes (pre-compression) that will be written
+ * @param maxEventFileBytes the maximum number of bytes (pre-compression) that will be written
* to a single event file before the file is rolled over
- *
- * @param maxEventFileBytes
*/
public void setMaxEventFileCapacity(final long maxEventFileBytes) {
this.eventFileBytes = maxEventFileBytes;
}
/**
- * Returns the fields that can be indexed
- *
- * @return
+ * @return the fields that should be indexed
*/
public List<SearchableField> getSearchableFields() {
return Collections.unmodifiableList(searchableFields);
}
/**
- * Sets the fields to index
- *
- * @param searchableFields
+ * @param searchableFields the fields to index
*/
public void setSearchableFields(final List<SearchableField> searchableFields) {
this.searchableFields = new ArrayList<>(searchableFields);
}
/**
- * Returns the FlowFile attributes that can be indexed
- *
- * @return
+ * @return the FlowFile attributes that should be indexed
*/
public List<SearchableField> getSearchableAttributes() {
return Collections.unmodifiableList(searchableAttributes);
}
/**
- * Sets the FlowFile attributes to index
- *
- * @param searchableAttributes
+ * @param searchableAttributes the FlowFile attributes to index
*/
public void setSearchableAttributes(final List<SearchableField> searchableAttributes) {
this.searchableAttributes = new ArrayList<>(searchableAttributes);
}
/**
- * Indicates whether or not event files will be compressed when they are
+ * @return whether or not event files will be compressed when they are
* rolled over
- *
- * @return
*/
public boolean isCompressOnRollover() {
return compress;
}
/**
- * Specifies whether or not to compress event files on rollover
- *
- * @param compress
+ * @param compress if true, the data will be compressed when rolled over
*/
public void setCompressOnRollover(final boolean compress) {
this.compress = compress;
}
+ /**
+ * @return the number of threads to use to query the repo
+ */
public int getQueryThreadPoolSize() {
return queryThreadPoolSize;
}
@@ -246,27 +227,23 @@ public class RepositoryConfiguration {
* </li>
* </ol>
*
- * @param bytes
+ * @param bytes the number of bytes to write to an index before beginning a new shard
*/
public void setDesiredIndexSize(final long bytes) {
this.desiredIndexBytes = bytes;
}
/**
- * Returns the desired size of each index shard. See the
- * {@Link #setDesiredIndexSize} method for an explanation of why we choose
+ * @return the desired size of each index shard. See the
+ * {@link #setDesiredIndexSize} method for an explanation of why we choose
* to shard the index.
- *
- * @return
*/
public long getDesiredIndexSize() {
return desiredIndexBytes;
}
/**
- * Sets the number of Journal files to use when persisting records.
- *
- * @param numJournals
+ * @param numJournals the number of Journal files to use when persisting records.
*/
public void setJournalCount(final int numJournals) {
if (numJournals < 1) {
@@ -277,19 +254,14 @@ public class RepositoryConfiguration {
}
/**
- * Returns the number of Journal files that will be used when persisting
- * records.
- *
- * @return
+ * @return the number of Journal files that will be used when persisting records.
*/
public int getJournalCount() {
return journalCount;
}
/**
- * Specifies whether or not the Repository should sync all updates to disk.
- *
- * @return
+ * @return <code>true</code> if the repository will perform an 'fsync' for all updates to disk
*/
public boolean isAlwaysSync() {
return alwaysSync;
@@ -301,7 +273,7 @@ public class RepositoryConfiguration {
* persisted across restarted, even if there is a power failure or a sudden
* Operating System crash, but it can be very expensive.
*
- * @param alwaysSync
+ * @param alwaysSync whether or not to perform an 'fsync' for all updates to disk
*/
public void setAlwaysSync(boolean alwaysSync) {
this.alwaysSync = alwaysSync;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
index 9bbf195..ca0d5ed 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
@@ -39,40 +39,40 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StandardRecordReader implements RecordReader {
- private static final Logger logger = LoggerFactory.getLogger(StandardRecordReader.class);
-
- private final ByteCountingInputStream rawInputStream;
+ private static final Logger logger = LoggerFactory.getLogger(StandardRecordReader.class);
+
+ private final ByteCountingInputStream rawInputStream;
private final String filename;
private final int serializationVersion;
private final boolean compressed;
private final TocReader tocReader;
private final int headerLength;
-
+
private DataInputStream dis;
private ByteCountingInputStream byteCountingIn;
public StandardRecordReader(final InputStream in, final String filename) throws IOException {
- this(in, filename, null);
+ this(in, filename, null);
}
-
+
public StandardRecordReader(final InputStream in, final String filename, final TocReader tocReader) throws IOException {
- logger.trace("Creating RecordReader for {}", filename);
-
- rawInputStream = new ByteCountingInputStream(in);
+ logger.trace("Creating RecordReader for {}", filename);
+
+ rawInputStream = new ByteCountingInputStream(in);
final InputStream limitedStream;
if ( tocReader == null ) {
- limitedStream = rawInputStream;
+ limitedStream = rawInputStream;
} else {
- final long offset1 = tocReader.getBlockOffset(1);
- if ( offset1 < 0 ) {
- limitedStream = rawInputStream;
- } else {
- limitedStream = new LimitingInputStream(rawInputStream, offset1 - rawInputStream.getBytesConsumed());
- }
- }
-
- final InputStream readableStream;
+ final long offset1 = tocReader.getBlockOffset(1);
+ if ( offset1 < 0 ) {
+ limitedStream = rawInputStream;
+ } else {
+ limitedStream = new LimitingInputStream(rawInputStream, offset1 - rawInputStream.getBytesConsumed());
+ }
+ }
+
+ final InputStream readableStream;
if (filename.endsWith(".gz")) {
readableStream = new BufferedInputStream(new GZIPInputStream(limitedStream));
compressed = true;
@@ -83,11 +83,11 @@ public class StandardRecordReader implements RecordReader {
byteCountingIn = new ByteCountingInputStream(readableStream);
dis = new DataInputStream(byteCountingIn);
-
+
final String repoClassName = dis.readUTF();
final int serializationVersion = dis.readInt();
- headerLength = repoClassName.getBytes(StandardCharsets.UTF_8).length + 2 + 4; // 2 bytes for string length, 4 for integer.
-
+ headerLength = repoClassName.getBytes(StandardCharsets.UTF_8).length + 2 + 4; // 2 bytes for string length, 4 for integer.
+
if (serializationVersion < 1 || serializationVersion > 8) {
throw new IllegalArgumentException("Unable to deserialize record because the version is " + serializationVersion + " and supported versions are 1-8");
}
@@ -99,52 +99,52 @@ public class StandardRecordReader implements RecordReader {
@Override
public void skipToBlock(final int blockIndex) throws IOException {
- if ( tocReader == null ) {
- throw new IllegalStateException("Cannot skip to block " + blockIndex + " for Provenance Log " + filename + " because no Table-of-Contents file was found for this Log");
- }
-
- if ( blockIndex < 0 ) {
- throw new IllegalArgumentException("Cannot skip to block " + blockIndex + " because the value is negative");
- }
-
- if ( blockIndex == getBlockIndex() ) {
- return;
- }
-
- final long offset = tocReader.getBlockOffset(blockIndex);
- if ( offset < 0 ) {
- throw new IOException("Unable to find block " + blockIndex + " in Provenance Log " + filename);
- }
-
- final long curOffset = rawInputStream.getBytesConsumed();
-
- final long bytesToSkip = offset - curOffset;
- if ( bytesToSkip >= 0 ) {
- try {
- StreamUtils.skip(rawInputStream, bytesToSkip);
- logger.debug("Skipped stream from offset {} to {} ({} bytes skipped)", curOffset, offset, bytesToSkip);
- } catch (final IOException e) {
- throw new IOException("Failed to skip to offset " + offset + " for block " + blockIndex + " of Provenance Log " + filename, e);
- }
-
- resetStreamForNextBlock();
- }
+ if ( tocReader == null ) {
+ throw new IllegalStateException("Cannot skip to block " + blockIndex + " for Provenance Log " + filename + " because no Table-of-Contents file was found for this Log");
+ }
+
+ if ( blockIndex < 0 ) {
+ throw new IllegalArgumentException("Cannot skip to block " + blockIndex + " because the value is negative");
+ }
+
+ if ( blockIndex == getBlockIndex() ) {
+ return;
+ }
+
+ final long offset = tocReader.getBlockOffset(blockIndex);
+ if ( offset < 0 ) {
+ throw new IOException("Unable to find block " + blockIndex + " in Provenance Log " + filename);
+ }
+
+ final long curOffset = rawInputStream.getBytesConsumed();
+
+ final long bytesToSkip = offset - curOffset;
+ if ( bytesToSkip >= 0 ) {
+ try {
+ StreamUtils.skip(rawInputStream, bytesToSkip);
+ logger.debug("Skipped stream from offset {} to {} ({} bytes skipped)", curOffset, offset, bytesToSkip);
+ } catch (final IOException e) {
+ throw new IOException("Failed to skip to offset " + offset + " for block " + blockIndex + " of Provenance Log " + filename, e);
+ }
+
+ resetStreamForNextBlock();
+ }
}
-
+
private void resetStreamForNextBlock() throws IOException {
- final InputStream limitedStream;
+ final InputStream limitedStream;
if ( tocReader == null ) {
- limitedStream = rawInputStream;
+ limitedStream = rawInputStream;
} else {
- final long offset = tocReader.getBlockOffset(1 + getBlockIndex());
- if ( offset < 0 ) {
- limitedStream = rawInputStream;
- } else {
- limitedStream = new LimitingInputStream(rawInputStream, offset - rawInputStream.getBytesConsumed());
- }
- }
-
- final InputStream readableStream;
+ final long offset = tocReader.getBlockOffset(1 + getBlockIndex());
+ if ( offset < 0 ) {
+ limitedStream = rawInputStream;
+ } else {
+ limitedStream = new LimitingInputStream(rawInputStream, offset - rawInputStream.getBytesConsumed());
+ }
+ }
+
+ final InputStream readableStream;
if (compressed) {
readableStream = new BufferedInputStream(new GZIPInputStream(limitedStream));
} else {
@@ -154,32 +154,32 @@ public class StandardRecordReader implements RecordReader {
byteCountingIn = new ByteCountingInputStream(readableStream, rawInputStream.getBytesConsumed());
dis = new DataInputStream(byteCountingIn);
}
-
-
+
+
@Override
public TocReader getTocReader() {
- return tocReader;
+ return tocReader;
}
-
+
@Override
public boolean isBlockIndexAvailable() {
- return tocReader != null;
+ return tocReader != null;
}
-
+
@Override
public int getBlockIndex() {
- if ( tocReader == null ) {
- throw new IllegalStateException("Cannot determine Block Index because no Table-of-Contents could be found for Provenance Log " + filename);
- }
-
- return tocReader.getBlockIndex(rawInputStream.getBytesConsumed());
+ if ( tocReader == null ) {
+ throw new IllegalStateException("Cannot determine Block Index because no Table-of-Contents could be found for Provenance Log " + filename);
+ }
+
+ return tocReader.getBlockIndex(rawInputStream.getBytesConsumed());
}
-
+
@Override
public long getBytesConsumed() {
- return byteCountingIn.getBytesConsumed();
+ return byteCountingIn.getBytesConsumed();
}
-
+
private StandardProvenanceEventRecord readPreVersion6Record() throws IOException {
final long startOffset = byteCountingIn.getBytesConsumed();
@@ -374,17 +374,17 @@ public class StandardRecordReader implements RecordReader {
}
private String readUUID(final DataInputStream in) throws IOException {
- if ( serializationVersion < 8 ) {
- final long msb = in.readLong();
- final long lsb = in.readLong();
- return new UUID(msb, lsb).toString();
- } else {
- // before version 8, we serialized UUID's as two longs in order to
- // write less data. However, in version 8 we changed to just writing
- // out the string because it's extremely expensive to call UUID.fromString.
- // In the end, since we generally compress, the savings in minimal anyway.
- return in.readUTF();
- }
+ if ( serializationVersion < 8 ) {
+ final long msb = in.readLong();
+ final long lsb = in.readLong();
+ return new UUID(msb, lsb).toString();
+ } else {
+ // before version 8, we serialized UUID's as two longs in order to
+ // write less data. However, in version 8 we changed to just writing
+ // out the string because it's extremely expensive to call UUID.fromString.
+ // In the end, since we generally compress, the savings in minimal anyway.
+ return in.readUTF();
+ }
}
private String readNullableString(final DataInputStream in) throws IOException {
@@ -416,53 +416,53 @@ public class StandardRecordReader implements RecordReader {
byteCountingIn.mark(1);
int nextByte = byteCountingIn.read();
byteCountingIn.reset();
-
+
if ( nextByte < 0 ) {
- try {
- resetStreamForNextBlock();
- } catch (final EOFException eof) {
- return false;
- }
-
+ try {
+ resetStreamForNextBlock();
+ } catch (final EOFException eof) {
+ return false;
+ }
+
byteCountingIn.mark(1);
nextByte = byteCountingIn.read();
byteCountingIn.reset();
}
-
+
return (nextByte >= 0);
}
-
+
@Override
public long getMaxEventId() throws IOException {
- if ( tocReader != null ) {
- final long lastBlockOffset = tocReader.getLastBlockOffset();
- skipToBlock(tocReader.getBlockIndex(lastBlockOffset));
- }
-
- ProvenanceEventRecord record;
- ProvenanceEventRecord lastRecord = null;
- try {
- while ((record = nextRecord()) != null) {
- lastRecord = record;
- }
- } catch (final EOFException eof) {
- // This can happen if we stop NIFi while the record is being written.
- // This is OK, we just ignore this record. The session will not have been
- // committed, so we can just process the FlowFile again.
- }
-
- return (lastRecord == null) ? -1L : lastRecord.getEventId();
+ if ( tocReader != null ) {
+ final long lastBlockOffset = tocReader.getLastBlockOffset();
+ skipToBlock(tocReader.getBlockIndex(lastBlockOffset));
+ }
+
+ ProvenanceEventRecord record;
+ ProvenanceEventRecord lastRecord = null;
+ try {
+ while ((record = nextRecord()) != null) {
+ lastRecord = record;
+ }
+ } catch (final EOFException eof) {
+ // This can happen if we stop NIFi while the record is being written.
+ // This is OK, we just ignore this record. The session will not have been
+ // committed, so we can just process the FlowFile again.
+ }
+
+ return (lastRecord == null) ? -1L : lastRecord.getEventId();
}
@Override
public void close() throws IOException {
- logger.trace("Closing Record Reader for {}", filename);
-
+ logger.trace("Closing Record Reader for {}", filename);
+
dis.close();
rawInputStream.close();
-
+
if ( tocReader != null ) {
- tocReader.close();
+ tocReader.close();
}
}
@@ -473,9 +473,9 @@ public class StandardRecordReader implements RecordReader {
@Override
public void skipTo(final long position) throws IOException {
- // we are subtracting headerLength from the number of bytes consumed because we used to
- // consider the offset of the first record "0" - now we consider it whatever position it
- // it really is in the stream.
+ // we are subtracting headerLength from the number of bytes consumed because we used to
+ // consider the offset of the first record "0" - now we consider it whatever position it
+ // it really is in the stream.
final long currentPosition = byteCountingIn.getBytesConsumed() - headerLength;
if (currentPosition == position) {
return;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
index dbb2c48..3095f13 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
@@ -36,15 +36,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StandardRecordWriter implements RecordWriter {
- private static final Logger logger = LoggerFactory.getLogger(StandardRecordWriter.class);
-
+ private static final Logger logger = LoggerFactory.getLogger(StandardRecordWriter.class);
+
private final File file;
private final FileOutputStream fos;
private final ByteCountingOutputStream rawOutStream;
private final TocWriter tocWriter;
private final boolean compressed;
private final int uncompressedBlockSize;
-
+
private DataOutputStream out;
private ByteCountingOutputStream byteCountingOut;
private long lastBlockOffset = 0L;
@@ -52,21 +52,21 @@ public class StandardRecordWriter implements RecordWriter {
private final Lock lock = new ReentrantLock();
-
+
public StandardRecordWriter(final File file, final TocWriter writer, final boolean compressed, final int uncompressedBlockSize) throws IOException {
- logger.trace("Creating Record Writer for {}", file.getName());
-
+ logger.trace("Creating Record Writer for {}", file.getName());
+
this.file = file;
this.compressed = compressed;
this.fos = new FileOutputStream(file);
rawOutStream = new ByteCountingOutputStream(fos);
this.uncompressedBlockSize = uncompressedBlockSize;
-
+
this.tocWriter = writer;
}
static void writeUUID(final DataOutputStream out, final String uuid) throws IOException {
- out.writeUTF(uuid);
+ out.writeUTF(uuid);
}
static void writeUUIDs(final DataOutputStream out, final Collection<String> list) throws IOException {
@@ -85,49 +85,49 @@ public class StandardRecordWriter implements RecordWriter {
return file;
}
- @Override
+ @Override
public synchronized void writeHeader() throws IOException {
lastBlockOffset = rawOutStream.getBytesWritten();
resetWriteStream();
-
+
out.writeUTF(PersistentProvenanceRepository.class.getName());
out.writeInt(PersistentProvenanceRepository.SERIALIZATION_VERSION);
out.flush();
}
-
+
private void resetWriteStream() throws IOException {
- if ( out != null ) {
- out.flush();
- }
-
- final long byteOffset = (byteCountingOut == null) ? rawOutStream.getBytesWritten() : byteCountingOut.getBytesWritten();
-
- final OutputStream writableStream;
- if ( compressed ) {
- // because of the way that GZIPOutputStream works, we need to call close() on it in order for it
- // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap
- // the underlying OutputStream in a NonCloseableOutputStream
- if ( out != null ) {
- out.close();
- }
-
- if ( tocWriter != null ) {
- tocWriter.addBlockOffset(rawOutStream.getBytesWritten());
- }
-
- writableStream = new BufferedOutputStream(new GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536);
- } else {
- if ( tocWriter != null ) {
- tocWriter.addBlockOffset(rawOutStream.getBytesWritten());
- }
-
- writableStream = new BufferedOutputStream(rawOutStream, 65536);
- }
-
+ if ( out != null ) {
+ out.flush();
+ }
+
+ final long byteOffset = (byteCountingOut == null) ? rawOutStream.getBytesWritten() : byteCountingOut.getBytesWritten();
+
+ final OutputStream writableStream;
+ if ( compressed ) {
+ // because of the way that GZIPOutputStream works, we need to call close() on it in order for it
+ // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap
+ // the underlying OutputStream in a NonCloseableOutputStream
+ if ( out != null ) {
+ out.close();
+ }
+
+ if ( tocWriter != null ) {
+ tocWriter.addBlockOffset(rawOutStream.getBytesWritten());
+ }
+
+ writableStream = new BufferedOutputStream(new GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536);
+ } else {
+ if ( tocWriter != null ) {
+ tocWriter.addBlockOffset(rawOutStream.getBytesWritten());
+ }
+
+ writableStream = new BufferedOutputStream(rawOutStream, 65536);
+ }
+
this.byteCountingOut = new ByteCountingOutputStream(writableStream, byteOffset);
this.out = new DataOutputStream(byteCountingOut);
}
-
+
@Override
public synchronized long writeRecord(final ProvenanceEventRecord record, long recordIdentifier) throws IOException {
@@ -136,16 +136,16 @@ public class StandardRecordWriter implements RecordWriter {
// add a new block to the TOC if needed.
if ( tocWriter != null && (startBytes - lastBlockOffset >= uncompressedBlockSize) ) {
- lastBlockOffset = startBytes;
-
- if ( compressed ) {
- // because of the way that GZIPOutputStream works, we need to call close() on it in order for it
- // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap
- // the underlying OutputStream in a NonCloseableOutputStream
- resetWriteStream();
- }
+ lastBlockOffset = startBytes;
+
+ if ( compressed ) {
+ // because of the way that GZIPOutputStream works, we need to call close() on it in order for it
+ // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap
+ // the underlying OutputStream in a NonCloseableOutputStream
+ resetWriteStream();
+ }
}
-
+
out.writeLong(recordIdentifier);
out.writeUTF(record.getEventType().name());
out.writeLong(record.getEventTime());
@@ -175,7 +175,7 @@ public class StandardRecordWriter implements RecordWriter {
writeLongNullableString(out, entry.getValue());
}
- // If Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'.
+ // If Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'.
if (record.getContentClaimSection() != null && record.getContentClaimContainer() != null && record.getContentClaimIdentifier() != null) {
out.writeBoolean(true);
out.writeUTF(record.getContentClaimContainer());
@@ -261,24 +261,24 @@ public class StandardRecordWriter implements RecordWriter {
@Override
public synchronized void close() throws IOException {
- logger.trace("Closing Record Writer for {}", file.getName());
-
+ logger.trace("Closing Record Writer for {}", file.getName());
+
lock();
try {
- try {
- out.flush();
- out.close();
- } finally {
- rawOutStream.close();
-
- if ( tocWriter != null ) {
- tocWriter.close();
- }
- }
+ try {
+ out.flush();
+ out.close();
+ } finally {
+ rawOutStream.close();
+
+ if ( tocWriter != null ) {
+ tocWriter.close();
+ }
+ }
} finally {
unlock();
}
-
+
}
@Override
@@ -308,14 +308,14 @@ public class StandardRecordWriter implements RecordWriter {
@Override
public void sync() throws IOException {
- if ( tocWriter != null ) {
- tocWriter.sync();
- }
- fos.getFD().sync();
+ if ( tocWriter != null ) {
+ tocWriter.sync();
+ }
+ fos.getFD().sync();
}
-
+
@Override
public TocWriter getTocWriter() {
- return tocWriter;
+ return tocWriter;
}
}
[2/4] incubator-nifi git commit: NIFI-527: Code cleanup
Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/ExpirationAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/ExpirationAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/ExpirationAction.java
index 8c266d1..0ffa5e6 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/ExpirationAction.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/ExpirationAction.java
@@ -25,9 +25,9 @@ public interface ExpirationAction {
* Performs some action against the given File and returns the new File that
* contains the modified version
*
- * @param expiredFile
- * @return
- * @throws IOException
+ * @param expiredFile the file that was expired
+ * @return the new file after the file has been renamed, or the expiredFile if the file was not renamed
+ * @throws IOException if there was an IO problem
*/
File execute(File expiredFile) throws IOException;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
index 7db04aa..70bf36e 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
@@ -49,9 +49,9 @@ public class DeleteIndexAction implements ExpirationAction {
long numDeleted = 0;
long maxEventId = -1L;
try (final RecordReader reader = RecordReaders.newRecordReader(expiredFile, repository.getAllLogFiles())) {
- maxEventId = reader.getMaxEventId();
+ maxEventId = reader.getMaxEventId();
} catch (final IOException ioe) {
- logger.warn("Failed to obtain max ID present in journal file {}", expiredFile.getAbsolutePath());
+ logger.warn("Failed to obtain max ID present in journal file {}", expiredFile.getAbsolutePath());
}
// remove the records from the index
@@ -68,19 +68,19 @@ public class DeleteIndexAction implements ExpirationAction {
deleteDir = (docsLeft <= 0);
logger.debug("After expiring {}, there are {} docs left for index {}", expiredFile, docsLeft, indexingDirectory);
} finally {
- indexManager.returnIndexWriter(indexingDirectory, writer);
+ indexManager.returnIndexWriter(indexingDirectory, writer);
}
// we've confirmed that all documents have been removed. Delete the index directory.
if (deleteDir) {
- indexManager.removeIndex(indexingDirectory);
+ indexManager.removeIndex(indexingDirectory);
indexConfiguration.removeIndexDirectory(indexingDirectory);
-
+
deleteDirectory(indexingDirectory);
logger.info("Removed empty index directory {}", indexingDirectory);
}
}
-
+
// Update the minimum index to 1 more than the max Event ID in this file.
if (maxEventId > -1L) {
indexConfiguration.setMinIdIndexed(maxEventId + 1L);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
index 5a77f42..98137fb 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
@@ -45,12 +45,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DocsReader {
- private final Logger logger = LoggerFactory.getLogger(DocsReader.class);
-
+ private final Logger logger = LoggerFactory.getLogger(DocsReader.class);
+
public DocsReader(final List<File> storageDirectories) {
}
- public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles, final AtomicInteger retrievalCount, final int maxResults) throws IOException {
+ public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles,
+ final AtomicInteger retrievalCount, final int maxResults) throws IOException {
if (retrievalCount.get() >= maxResults) {
return Collections.emptySet();
}
@@ -73,42 +74,42 @@ public class DocsReader {
return read(docs, allProvenanceLogFiles);
}
-
+
private long getByteOffset(final Document d, final RecordReader reader) {
final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
if ( blockField != null ) {
- final int blockIndex = blockField.numericValue().intValue();
- final TocReader tocReader = reader.getTocReader();
- return tocReader.getBlockOffset(blockIndex);
+ final int blockIndex = blockField.numericValue().intValue();
+ final TocReader tocReader = reader.getTocReader();
+ return tocReader.getBlockOffset(blockIndex);
}
-
- return d.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
+
+ return d.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
}
-
-
+
+
private ProvenanceEventRecord getRecord(final Document d, final RecordReader reader) throws IOException {
- IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
- if ( blockField == null ) {
- reader.skipTo(getByteOffset(d, reader));
- } else {
- reader.skipToBlock(blockField.numericValue().intValue());
- }
-
+ IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
+ if ( blockField == null ) {
+ reader.skipTo(getByteOffset(d, reader));
+ } else {
+ reader.skipToBlock(blockField.numericValue().intValue());
+ }
+
StandardProvenanceEventRecord record;
while ( (record = reader.nextRecord()) != null) {
- IndexableField idField = d.getField(SearchableFields.Identifier.getSearchableFieldName());
- if ( idField == null || idField.numericValue().longValue() == record.getEventId() ) {
- break;
- }
+ IndexableField idField = d.getField(SearchableFields.Identifier.getSearchableFieldName());
+ if ( idField == null || idField.numericValue().longValue() == record.getEventId() ) {
+ break;
+ }
}
-
+
if ( record == null ) {
- throw new IOException("Failed to find Provenance Event " + d);
+ throw new IOException("Failed to find Provenance Event " + d);
} else {
- return record;
+ return record;
}
}
-
+
public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles) throws IOException {
LuceneUtil.sortDocsForRetrieval(docs);
@@ -119,23 +120,23 @@ public class DocsReader {
final long start = System.nanoTime();
int logFileCount = 0;
-
+
final Set<String> storageFilesToSkip = new HashSet<>();
-
+
try {
for (final Document d : docs) {
final String storageFilename = d.getField(FieldNames.STORAGE_FILENAME).stringValue();
if ( storageFilesToSkip.contains(storageFilename) ) {
- continue;
+ continue;
}
-
+
try {
if (reader != null && storageFilename.equals(lastStorageFilename)) {
- matchingRecords.add(getRecord(d, reader));
+ matchingRecords.add(getRecord(d, reader));
} else {
- logger.debug("Opening log file {}", storageFilename);
-
- logFileCount++;
+ logger.debug("Opening log file {}", storageFilename);
+
+ logFileCount++;
if (reader != null) {
reader.close();
}
@@ -143,20 +144,20 @@ public class DocsReader {
List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
if (potentialFiles.isEmpty()) {
logger.warn("Could not find Provenance Log File with basename {} in the "
- + "Provenance Repository; assuming file has expired and continuing without it", storageFilename);
+ + "Provenance Repository; assuming file has expired and continuing without it", storageFilename);
storageFilesToSkip.add(storageFilename);
continue;
}
if (potentialFiles.size() > 1) {
- throw new FileNotFoundException("Found multiple Provenance Log Files with basename " +
- storageFilename + " in the Provenance Repository");
+ throw new FileNotFoundException("Found multiple Provenance Log Files with basename " +
+ storageFilename + " in the Provenance Repository");
}
for (final File file : potentialFiles) {
try {
- reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles);
- matchingRecords.add(getRecord(d, reader));
+ reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles);
+ matchingRecords.add(getRecord(d, reader));
} catch (final IOException e) {
throw new IOException("Failed to retrieve record " + d + " from Provenance File " + file + " due to " + e, e);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
index 3943504..9c3ec31 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
@@ -41,65 +41,65 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class IndexManager implements Closeable {
- private static final Logger logger = LoggerFactory.getLogger(IndexManager.class);
-
- private final Lock lock = new ReentrantLock();
- private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
- private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>();
-
-
- public void removeIndex(final File indexDirectory) {
- final File absoluteFile = indexDirectory.getAbsoluteFile();
- logger.info("Removing index {}", indexDirectory);
-
- lock.lock();
- try {
- final IndexWriterCount count = writerCounts.remove(absoluteFile);
- if ( count != null ) {
- try {
- count.close();
- } catch (final IOException ioe) {
- logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile);
- if ( logger.isDebugEnabled() ) {
- logger.warn("", ioe);
- }
- }
- }
-
- for ( final List<ActiveIndexSearcher> searcherList : activeSearchers.values() ) {
- for ( final ActiveIndexSearcher searcher : searcherList ) {
- try {
- searcher.close();
- } catch (final IOException ioe) {
- logger.warn("Failed to close Index Searcher {} for {} due to {}",
- searcher.getSearcher(), absoluteFile, ioe);
- if ( logger.isDebugEnabled() ) {
- logger.warn("", ioe);
- }
- }
- }
- }
- } finally {
- lock.unlock();
- }
- }
-
- public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException {
- final File absoluteFile = indexingDirectory.getAbsoluteFile();
- logger.debug("Borrowing index writer for {}", indexingDirectory);
-
- lock.lock();
- try {
- IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
- if ( writerCount == null ) {
- final List<Closeable> closeables = new ArrayList<>();
+ private static final Logger logger = LoggerFactory.getLogger(IndexManager.class);
+
+ private final Lock lock = new ReentrantLock();
+ private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
+ private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>();
+
+
+ public void removeIndex(final File indexDirectory) {
+ final File absoluteFile = indexDirectory.getAbsoluteFile();
+ logger.info("Removing index {}", indexDirectory);
+
+ lock.lock();
+ try {
+ final IndexWriterCount count = writerCounts.remove(absoluteFile);
+ if ( count != null ) {
+ try {
+ count.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+ }
+
+ for ( final List<ActiveIndexSearcher> searcherList : activeSearchers.values() ) {
+ for ( final ActiveIndexSearcher searcher : searcherList ) {
+ try {
+ searcher.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close Index Searcher {} for {} due to {}",
+ searcher.getSearcher(), absoluteFile, ioe);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException {
+ final File absoluteFile = indexingDirectory.getAbsoluteFile();
+ logger.debug("Borrowing index writer for {}", indexingDirectory);
+
+ lock.lock();
+ try {
+ IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+ if ( writerCount == null ) {
+ final List<Closeable> closeables = new ArrayList<>();
final Directory directory = FSDirectory.open(indexingDirectory);
closeables.add(directory);
-
+
try {
- final Analyzer analyzer = new StandardAnalyzer();
- closeables.add(analyzer);
-
+ final Analyzer analyzer = new StandardAnalyzer();
+ closeables.add(analyzer);
+
final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
config.setWriteLockTimeout(300000L);
@@ -107,361 +107,361 @@ public class IndexManager implements Closeable {
writerCount = new IndexWriterCount(indexWriter, analyzer, directory, 1);
logger.debug("Providing new index writer for {}", indexingDirectory);
} catch (final IOException ioe) {
- for ( final Closeable closeable : closeables ) {
- try {
- closeable.close();
- } catch (final IOException ioe2) {
- ioe.addSuppressed(ioe2);
- }
- }
-
- throw ioe;
+ for ( final Closeable closeable : closeables ) {
+ try {
+ closeable.close();
+ } catch (final IOException ioe2) {
+ ioe.addSuppressed(ioe2);
+ }
+ }
+
+ throw ioe;
}
-
+
writerCounts.put(absoluteFile, writerCount);
- } else {
- logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
- writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
- writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
- }
-
- return writerCount.getWriter();
- } finally {
- lock.unlock();
- }
- }
-
- public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) {
- final File absoluteFile = indexingDirectory.getAbsoluteFile();
- logger.debug("Returning Index Writer for {} to IndexManager", indexingDirectory);
-
- lock.lock();
- try {
- IndexWriterCount count = writerCounts.remove(absoluteFile);
-
- try {
- if ( count == null ) {
- logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. "
- + "This could potentially lead to a resource leak", writer, indexingDirectory);
- writer.close();
- } else if ( count.getCount() <= 1 ) {
- // we are finished with this writer.
- logger.debug("Closing Index Writer for {}", indexingDirectory);
- count.close();
- } else {
- // decrement the count.
- logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1);
- writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1));
- }
- } catch (final IOException ioe) {
- logger.warn("Failed to close Index Writer {} due to {}", writer, ioe);
- if ( logger.isDebugEnabled() ) {
- logger.warn("", ioe);
- }
- }
- } finally {
- lock.unlock();
- }
- }
-
-
- public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
- final File absoluteFile = indexDir.getAbsoluteFile();
- logger.debug("Borrowing index searcher for {}", indexDir);
-
- lock.lock();
- try {
- // check if we already have a reader cached.
- List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
- if ( currentlyCached == null ) {
- currentlyCached = new ArrayList<>();
- activeSearchers.put(absoluteFile, currentlyCached);
- } else {
- // keep track of any searchers that have been closed so that we can remove them
- // from our cache later.
- final Set<ActiveIndexSearcher> expired = new HashSet<>();
-
- try {
- for ( final ActiveIndexSearcher searcher : currentlyCached ) {
- if ( searcher.isCache() ) {
- final int refCount = searcher.getSearcher().getIndexReader().getRefCount();
- if ( refCount <= 0 ) {
- // if refCount == 0, then the reader has been closed, so we need to discard the searcher
- logger.debug("Reference count for cached Index Searcher for {} is currently {}; "
- + "removing cached searcher", absoluteFile, refCount);
- expired.add(searcher);
- continue;
- }
-
- logger.debug("Providing previously cached index searcher for {}", indexDir);
- return searcher.getSearcher();
- }
- }
- } finally {
- // if we have any expired index searchers, we need to close them and remove them
- // from the cache so that we don't try to use them again later.
- for ( final ActiveIndexSearcher searcher : expired ) {
- try {
- searcher.close();
- } catch (final Exception e) {
- logger.debug("Failed to close 'expired' IndexSearcher {}", searcher);
- }
-
- currentlyCached.remove(searcher);
- }
- }
- }
-
- IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
- if ( writerCount == null ) {
- final Directory directory = FSDirectory.open(absoluteFile);
- logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir);
-
- try {
- final DirectoryReader directoryReader = DirectoryReader.open(directory);
- final IndexSearcher searcher = new IndexSearcher(directoryReader);
-
- // we want to cache the searcher that we create, since it's just a reader.
- final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, directoryReader, directory, true);
- currentlyCached.add(cached);
-
- return cached.getSearcher();
- } catch (final IOException e) {
- try {
- directory.close();
- } catch (final IOException ioe) {
- e.addSuppressed(ioe);
- }
-
- throw e;
- }
- } else {
- logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing "
- + "counter to {}", indexDir, writerCount.getCount() + 1);
-
- // increment the writer count to ensure that it's kept open.
- writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
- writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
-
- // create a new Index Searcher from the writer so that we don't have an issue with trying
- // to read from a directory that's locked. If we get the "no segments* file found" with
- // Lucene, this indicates that an IndexWriter already has the directory open.
- final IndexWriter writer = writerCount.getWriter();
- final DirectoryReader directoryReader = DirectoryReader.open(writer, false);
- final IndexSearcher searcher = new IndexSearcher(directoryReader);
-
- // we don't want to cache this searcher because it's based on a writer, so we want to get
- // new values the next time that we search.
- final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, directoryReader, null, false);
-
- currentlyCached.add(activeSearcher);
- return activeSearcher.getSearcher();
- }
- } finally {
- lock.unlock();
- }
- }
-
-
- public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) {
- final File absoluteFile = indexDirectory.getAbsoluteFile();
- logger.debug("Returning index searcher for {} to IndexManager", indexDirectory);
-
- lock.lock();
- try {
- // check if we already have a reader cached.
- List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
- if ( currentlyCached == null ) {
- logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could "
- + "result in a resource leak", indexDirectory);
- return;
- }
-
- final Iterator<ActiveIndexSearcher> itr = currentlyCached.iterator();
- while (itr.hasNext()) {
- final ActiveIndexSearcher activeSearcher = itr.next();
- if ( activeSearcher.getSearcher().equals(searcher) ) {
- if ( activeSearcher.isCache() ) {
- // the searcher is cached. Just leave it open.
- logger.debug("Index searcher for {} is cached; leaving open", indexDirectory);
- return;
- } else {
- // searcher is not cached. It was created from a writer, and we want
- // the newest updates the next time that we get a searcher, so we will
- // go ahead and close this one out.
- itr.remove();
-
- // decrement the writer count because we incremented it when creating the searcher
- final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
- if ( writerCount != null ) {
- if ( writerCount.getCount() <= 1 ) {
- try {
- logger.debug("Index searcher for {} is not cached. Writer count is "
- + "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1);
-
- writerCount.close();
- } catch (final IOException ioe) {
- logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe);
- if ( logger.isDebugEnabled() ) {
- logger.warn("", ioe);
- }
- }
- } else {
- logger.debug("Index searcher for {} is not cached. Writer count is decremented "
- + "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1);
-
- writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
- writerCount.getAnalyzer(), writerCount.getDirectory(),
- writerCount.getCount() - 1));
- }
- }
-
- try {
- logger.debug("Closing Index Searcher for {}", indexDirectory);
- activeSearcher.close();
- } catch (final IOException ioe) {
- logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
- if ( logger.isDebugEnabled() ) {
- logger.warn("", ioe);
- }
- }
- }
- }
- }
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public void close() throws IOException {
- logger.debug("Closing Index Manager");
-
- lock.lock();
- try {
- IOException ioe = null;
-
- for ( final IndexWriterCount count : writerCounts.values() ) {
- try {
- count.close();
- } catch (final IOException e) {
- if ( ioe == null ) {
- ioe = e;
- } else {
- ioe.addSuppressed(e);
- }
- }
- }
-
- for (final List<ActiveIndexSearcher> searcherList : activeSearchers.values()) {
- for (final ActiveIndexSearcher searcher : searcherList) {
- try {
- searcher.close();
- } catch (final IOException e) {
- if ( ioe == null ) {
- ioe = e;
- } else {
- ioe.addSuppressed(e);
- }
- }
- }
- }
-
- if ( ioe != null ) {
- throw ioe;
- }
- } finally {
- lock.unlock();
- }
- }
-
-
- private static void close(final Closeable... closeables) throws IOException {
- IOException ioe = null;
- for ( final Closeable closeable : closeables ) {
- if ( closeable == null ) {
- continue;
- }
-
- try {
- closeable.close();
- } catch (final IOException e) {
- if ( ioe == null ) {
- ioe = e;
- } else {
- ioe.addSuppressed(e);
- }
- }
- }
-
- if ( ioe != null ) {
- throw ioe;
- }
- }
-
-
- private static class ActiveIndexSearcher implements Closeable {
- private final IndexSearcher searcher;
- private final DirectoryReader directoryReader;
- private final Directory directory;
- private final boolean cache;
-
- public ActiveIndexSearcher(IndexSearcher searcher, DirectoryReader directoryReader,
- Directory directory, final boolean cache) {
- this.searcher = searcher;
- this.directoryReader = directoryReader;
- this.directory = directory;
- this.cache = cache;
- }
-
- public boolean isCache() {
- return cache;
- }
-
- public IndexSearcher getSearcher() {
- return searcher;
- }
-
- @Override
- public void close() throws IOException {
- IndexManager.close(directoryReader, directory);
- }
- }
-
-
- private static class IndexWriterCount implements Closeable {
- private final IndexWriter writer;
- private final Analyzer analyzer;
- private final Directory directory;
- private final int count;
-
- public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) {
- this.writer = writer;
- this.analyzer = analyzer;
- this.directory = directory;
- this.count = count;
- }
-
- public Analyzer getAnalyzer() {
- return analyzer;
- }
-
- public Directory getDirectory() {
- return directory;
- }
-
- public IndexWriter getWriter() {
- return writer;
- }
-
- public int getCount() {
- return count;
- }
-
- @Override
- public void close() throws IOException {
- IndexManager.close(writer, analyzer, directory);
- }
- }
+ } else {
+ logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
+ writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+ writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
+ }
+
+ return writerCount.getWriter();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) {
+ final File absoluteFile = indexingDirectory.getAbsoluteFile();
+ logger.debug("Returning Index Writer for {} to IndexManager", indexingDirectory);
+
+ lock.lock();
+ try {
+ IndexWriterCount count = writerCounts.remove(absoluteFile);
+
+ try {
+ if ( count == null ) {
+ logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. "
+ + "This could potentially lead to a resource leak", writer, indexingDirectory);
+ writer.close();
+ } else if ( count.getCount() <= 1 ) {
+ // we are finished with this writer.
+ logger.debug("Closing Index Writer for {}", indexingDirectory);
+ count.close();
+ } else {
+ // decrement the count.
+ logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1);
+ writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1));
+ }
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close Index Writer {} due to {}", writer, ioe);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+
+ public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
+ final File absoluteFile = indexDir.getAbsoluteFile();
+ logger.debug("Borrowing index searcher for {}", indexDir);
+
+ lock.lock();
+ try {
+ // check if we already have a reader cached.
+ List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
+ if ( currentlyCached == null ) {
+ currentlyCached = new ArrayList<>();
+ activeSearchers.put(absoluteFile, currentlyCached);
+ } else {
+ // keep track of any searchers that have been closed so that we can remove them
+ // from our cache later.
+ final Set<ActiveIndexSearcher> expired = new HashSet<>();
+
+ try {
+ for ( final ActiveIndexSearcher searcher : currentlyCached ) {
+ if ( searcher.isCache() ) {
+ final int refCount = searcher.getSearcher().getIndexReader().getRefCount();
+ if ( refCount <= 0 ) {
+ // if refCount == 0, then the reader has been closed, so we need to discard the searcher
+ logger.debug("Reference count for cached Index Searcher for {} is currently {}; "
+ + "removing cached searcher", absoluteFile, refCount);
+ expired.add(searcher);
+ continue;
+ }
+
+ logger.debug("Providing previously cached index searcher for {}", indexDir);
+ return searcher.getSearcher();
+ }
+ }
+ } finally {
+ // if we have any expired index searchers, we need to close them and remove them
+ // from the cache so that we don't try to use them again later.
+ for ( final ActiveIndexSearcher searcher : expired ) {
+ try {
+ searcher.close();
+ } catch (final Exception e) {
+ logger.debug("Failed to close 'expired' IndexSearcher {}", searcher);
+ }
+
+ currentlyCached.remove(searcher);
+ }
+ }
+ }
+
+ IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+ if ( writerCount == null ) {
+ final Directory directory = FSDirectory.open(absoluteFile);
+ logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir);
+
+ try {
+ final DirectoryReader directoryReader = DirectoryReader.open(directory);
+ final IndexSearcher searcher = new IndexSearcher(directoryReader);
+
+ // we want to cache the searcher that we create, since it's just a reader.
+ final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, directoryReader, directory, true);
+ currentlyCached.add(cached);
+
+ return cached.getSearcher();
+ } catch (final IOException e) {
+ try {
+ directory.close();
+ } catch (final IOException ioe) {
+ e.addSuppressed(ioe);
+ }
+
+ throw e;
+ }
+ } else {
+ logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing "
+ + "counter to {}", indexDir, writerCount.getCount() + 1);
+
+ // increment the writer count to ensure that it's kept open.
+ writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+ writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
+
+ // create a new Index Searcher from the writer so that we don't have an issue with trying
+ // to read from a directory that's locked. If we get the "no segments* file found" with
+ // Lucene, this indicates that an IndexWriter already has the directory open.
+ final IndexWriter writer = writerCount.getWriter();
+ final DirectoryReader directoryReader = DirectoryReader.open(writer, false);
+ final IndexSearcher searcher = new IndexSearcher(directoryReader);
+
+ // we don't want to cache this searcher because it's based on a writer, so we want to get
+ // new values the next time that we search.
+ final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, directoryReader, null, false);
+
+ currentlyCached.add(activeSearcher);
+ return activeSearcher.getSearcher();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+
+ public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) {
+ final File absoluteFile = indexDirectory.getAbsoluteFile();
+ logger.debug("Returning index searcher for {} to IndexManager", indexDirectory);
+
+ lock.lock();
+ try {
+ // check if we already have a reader cached.
+ List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
+ if ( currentlyCached == null ) {
+ logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could "
+ + "result in a resource leak", indexDirectory);
+ return;
+ }
+
+ final Iterator<ActiveIndexSearcher> itr = currentlyCached.iterator();
+ while (itr.hasNext()) {
+ final ActiveIndexSearcher activeSearcher = itr.next();
+ if ( activeSearcher.getSearcher().equals(searcher) ) {
+ if ( activeSearcher.isCache() ) {
+ // the searcher is cached. Just leave it open.
+ logger.debug("Index searcher for {} is cached; leaving open", indexDirectory);
+ return;
+ } else {
+ // searcher is not cached. It was created from a writer, and we want
+ // the newest updates the next time that we get a searcher, so we will
+ // go ahead and close this one out.
+ itr.remove();
+
+ // decrement the writer count because we incremented it when creating the searcher
+ final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+ if ( writerCount != null ) {
+ if ( writerCount.getCount() <= 1 ) {
+ try {
+ logger.debug("Index searcher for {} is not cached. Writer count is "
+ + "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1);
+
+ writerCount.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+ } else {
+ logger.debug("Index searcher for {} is not cached. Writer count is decremented "
+ + "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1);
+
+ writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+ writerCount.getAnalyzer(), writerCount.getDirectory(),
+ writerCount.getCount() - 1));
+ }
+ }
+
+ try {
+ logger.debug("Closing Index Searcher for {}", indexDirectory);
+ activeSearcher.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+ }
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ logger.debug("Closing Index Manager");
+
+ lock.lock();
+ try {
+ IOException ioe = null;
+
+ for ( final IndexWriterCount count : writerCounts.values() ) {
+ try {
+ count.close();
+ } catch (final IOException e) {
+ if ( ioe == null ) {
+ ioe = e;
+ } else {
+ ioe.addSuppressed(e);
+ }
+ }
+ }
+
+ for (final List<ActiveIndexSearcher> searcherList : activeSearchers.values()) {
+ for (final ActiveIndexSearcher searcher : searcherList) {
+ try {
+ searcher.close();
+ } catch (final IOException e) {
+ if ( ioe == null ) {
+ ioe = e;
+ } else {
+ ioe.addSuppressed(e);
+ }
+ }
+ }
+ }
+
+ if ( ioe != null ) {
+ throw ioe;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+
+ private static void close(final Closeable... closeables) throws IOException {
+ IOException ioe = null;
+ for ( final Closeable closeable : closeables ) {
+ if ( closeable == null ) {
+ continue;
+ }
+
+ try {
+ closeable.close();
+ } catch (final IOException e) {
+ if ( ioe == null ) {
+ ioe = e;
+ } else {
+ ioe.addSuppressed(e);
+ }
+ }
+ }
+
+ if ( ioe != null ) {
+ throw ioe;
+ }
+ }
+
+
+ private static class ActiveIndexSearcher implements Closeable {
+ private final IndexSearcher searcher;
+ private final DirectoryReader directoryReader;
+ private final Directory directory;
+ private final boolean cache;
+
+ public ActiveIndexSearcher(IndexSearcher searcher, DirectoryReader directoryReader,
+ Directory directory, final boolean cache) {
+ this.searcher = searcher;
+ this.directoryReader = directoryReader;
+ this.directory = directory;
+ this.cache = cache;
+ }
+
+ public boolean isCache() {
+ return cache;
+ }
+
+ public IndexSearcher getSearcher() {
+ return searcher;
+ }
+
+ @Override
+ public void close() throws IOException {
+ IndexManager.close(directoryReader, directory);
+ }
+ }
+
+
+ private static class IndexWriterCount implements Closeable {
+ private final IndexWriter writer;
+ private final Analyzer analyzer;
+ private final Directory directory;
+ private final int count;
+
+ public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) {
+ this.writer = writer;
+ this.analyzer = analyzer;
+ this.directory = directory;
+ this.count = count;
+ }
+
+ public Analyzer getAnalyzer() {
+ return analyzer;
+ }
+
+ public Directory getDirectory() {
+ return directory;
+ }
+
+ public IndexWriter getWriter() {
+ return writer;
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ @Override
+ public void close() throws IOException {
+ IndexManager.close(writer, analyzer, directory);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
index dcb6e08..53869f4 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
@@ -35,7 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class IndexSearch {
- private final Logger logger = LoggerFactory.getLogger(IndexSearch.class);
+ private final Logger logger = LoggerFactory.getLogger(IndexSearch.class);
private final PersistentProvenanceRepository repository;
private final File indexDirectory;
private final IndexManager indexManager;
@@ -65,17 +65,17 @@ public class IndexSearch {
final long start = System.nanoTime();
IndexSearcher searcher = null;
try {
- searcher = indexManager.borrowIndexSearcher(indexDirectory);
+ searcher = indexManager.borrowIndexSearcher(indexDirectory);
final long searchStartNanos = System.nanoTime();
final long openSearcherNanos = searchStartNanos - start;
-
+
final TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults());
final long finishSearch = System.nanoTime();
final long searchNanos = finishSearch - searchStartNanos;
-
- logger.debug("Searching {} took {} millis; opening searcher took {} millis", this,
- TimeUnit.NANOSECONDS.toMillis(searchNanos), TimeUnit.NANOSECONDS.toMillis(openSearcherNanos));
-
+
+ logger.debug("Searching {} took {} millis; opening searcher took {} millis", this,
+ TimeUnit.NANOSECONDS.toMillis(searchNanos), TimeUnit.NANOSECONDS.toMillis(openSearcherNanos));
+
if (topDocs.totalHits == 0) {
sqr.update(Collections.<ProvenanceEventRecord>emptyList(), 0);
return sqr;
@@ -83,31 +83,31 @@ public class IndexSearch {
final DocsReader docsReader = new DocsReader(repository.getConfiguration().getStorageDirectories());
matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount, provenanceQuery.getMaxResults());
-
+
final long readRecordsNanos = System.nanoTime() - finishSearch;
logger.debug("Reading {} records took {} millis for {}", matchingRecords.size(), TimeUnit.NANOSECONDS.toMillis(readRecordsNanos), this);
-
+
sqr.update(matchingRecords, topDocs.totalHits);
return sqr;
} catch (final FileNotFoundException e) {
// nothing has been indexed yet, or the data has already aged off
- logger.warn("Attempted to search Provenance Index {} but could not find the file due to {}", indexDirectory, e);
- if ( logger.isDebugEnabled() ) {
- logger.warn("", e);
- }
-
+ logger.warn("Attempted to search Provenance Index {} but could not find the file due to {}", indexDirectory, e);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", e);
+ }
+
sqr.update(Collections.<ProvenanceEventRecord>emptyList(), 0);
return sqr;
} finally {
- if ( searcher != null ) {
- indexManager.returnIndexSearcher(indexDirectory, searcher);
- }
+ if ( searcher != null ) {
+ indexManager.returnIndexSearcher(indexDirectory, searcher);
+ }
}
}
-
+
@Override
public String toString() {
- return "IndexSearcher[" + indexDirectory + "]";
+ return "IndexSearcher[" + indexDirectory + "]";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
index 5e87913..46be391 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
@@ -16,50 +16,30 @@
*/
package org.apache.nifi.provenance.lucene;
-import java.io.EOFException;
-import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.IntField;
import org.apache.lucene.document.LongField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.FSDirectory;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.provenance.IndexConfiguration;
import org.apache.nifi.provenance.PersistentProvenanceRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.SearchableFields;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
-import org.apache.nifi.provenance.rollover.RolloverAction;
import org.apache.nifi.provenance.search.SearchableField;
-import org.apache.nifi.provenance.serialization.RecordReader;
-import org.apache.nifi.provenance.serialization.RecordReaders;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class IndexingAction implements RolloverAction {
-
- private final PersistentProvenanceRepository repository;
+public class IndexingAction {
private final Set<SearchableField> nonAttributeSearchableFields;
private final Set<SearchableField> attributeSearchableFields;
- private final IndexConfiguration indexConfiguration;
- private static final Logger logger = LoggerFactory.getLogger(IndexingAction.class);
-
- public IndexingAction(final PersistentProvenanceRepository repo, final IndexConfiguration indexConfig) {
- repository = repo;
- indexConfiguration = indexConfig;
+ public IndexingAction(final PersistentProvenanceRepository repo) {
attributeSearchableFields = Collections.unmodifiableSet(new HashSet<>(repo.getConfiguration().getSearchableAttributes()));
nonAttributeSearchableFields = Collections.unmodifiableSet(new HashSet<>(repo.getConfiguration().getSearchableFields()));
}
@@ -72,7 +52,7 @@ public class IndexingAction implements RolloverAction {
doc.add(new StringField(field.getSearchableFieldName(), value.toLowerCase(), store));
}
-
+
public void index(final StandardProvenanceEventRecord record, final IndexWriter indexWriter, final Integer blockIndex) throws IOException {
final Map<String, String> attributes = record.getAttributes();
@@ -105,14 +85,14 @@ public class IndexingAction implements RolloverAction {
doc.add(new LongField(SearchableFields.EventTime.getSearchableFieldName(), record.getEventTime(), Store.NO));
doc.add(new LongField(SearchableFields.FileSize.getSearchableFieldName(), record.getFileSize(), Store.NO));
doc.add(new StringField(FieldNames.STORAGE_FILENAME, storageFilename, Store.YES));
-
+
if ( blockIndex == null ) {
- doc.add(new LongField(FieldNames.STORAGE_FILE_OFFSET, record.getStorageByteOffset(), Store.YES));
+ doc.add(new LongField(FieldNames.STORAGE_FILE_OFFSET, record.getStorageByteOffset(), Store.YES));
} else {
- doc.add(new IntField(FieldNames.BLOCK_INDEX, blockIndex, Store.YES));
- doc.add(new LongField(SearchableFields.Identifier.getSearchableFieldName(), record.getEventId(), Store.YES));
+ doc.add(new IntField(FieldNames.BLOCK_INDEX, blockIndex, Store.YES));
+ doc.add(new LongField(SearchableFields.Identifier.getSearchableFieldName(), record.getEventId(), Store.YES));
}
-
+
for (final String lineageIdentifier : record.getLineageIdentifiers()) {
addField(doc, SearchableFields.LineageIdentifier, lineageIdentifier, Store.NO);
}
@@ -150,87 +130,4 @@ public class IndexingAction implements RolloverAction {
indexWriter.addDocument(doc);
}
}
-
- @Override
- public File execute(final File fileRolledOver) throws IOException {
- final File indexingDirectory = indexConfiguration.getWritableIndexDirectory(fileRolledOver);
- int indexCount = 0;
- long maxId = -1L;
-
- try (final Directory directory = FSDirectory.open(indexingDirectory);
- final Analyzer analyzer = new StandardAnalyzer()) {
-
- final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
- config.setWriteLockTimeout(300000L);
-
- try (final IndexWriter indexWriter = new IndexWriter(directory, config);
- final RecordReader reader = RecordReaders.newRecordReader(fileRolledOver, repository.getAllLogFiles())) {
- StandardProvenanceEventRecord record;
- while (true) {
- final Integer blockIndex;
- if ( reader.isBlockIndexAvailable() ) {
- blockIndex = reader.getBlockIndex();
- } else {
- blockIndex = null;
- }
-
- try {
- record = reader.nextRecord();
- } catch (final EOFException eof) {
- // system was restarted while writing to the log file. Nothing we can do here, so ignore this record.
- // On system restart, the FlowFiles should be back in their "original" queues, so the events will be re-created
- // when the data is re-processed
- break;
- }
-
- if (record == null) {
- break;
- }
-
- maxId = record.getEventId();
-
- index(record, indexWriter, blockIndex);
- indexCount++;
- }
-
- indexWriter.commit();
- } catch (final EOFException eof) {
- // nothing in the file. Move on.
- }
- } finally {
- if (maxId >= -1) {
- indexConfiguration.setMaxIdIndexed(maxId);
- }
- }
-
- final File newFile = new File(fileRolledOver.getParent(),
- LuceneUtil.substringBeforeLast(fileRolledOver.getName(), ".")
- + ".indexed."
- + LuceneUtil.substringAfterLast(fileRolledOver.getName(), "."));
-
- boolean renamed = false;
- for (int i = 0; i < 10 && !renamed; i++) {
- renamed = fileRolledOver.renameTo(newFile);
- if (!renamed) {
- try {
- Thread.sleep(25L);
- } catch (final InterruptedException e) {
- }
- }
- }
-
- if (renamed) {
- logger.info("Finished indexing Provenance Log File {} to index {} with {} records indexed and renamed file to {}",
- fileRolledOver, indexingDirectory, indexCount, newFile);
- return newFile;
- } else {
- logger.warn("Finished indexing Provenance Log File {} with {} records indexed but failed to rename file to {}; indexed {} records", new Object[]{fileRolledOver, indexCount, newFile, indexCount});
- return fileRolledOver;
- }
- }
-
- @Override
- public boolean hasBeenPerformed(final File fileRolledOver) {
- return fileRolledOver.getName().contains(".indexed.");
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
index 54cde15..3f75c00 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
@@ -48,7 +48,8 @@ public class LineageQuery {
public static final int MAX_LINEAGE_UUIDS = 100;
private static final Logger logger = LoggerFactory.getLogger(LineageQuery.class);
- public static Set<ProvenanceEventRecord> computeLineageForFlowFiles(final PersistentProvenanceRepository repo, final File indexDirectory, final String lineageIdentifier, final Collection<String> flowFileUuids) throws IOException {
+ public static Set<ProvenanceEventRecord> computeLineageForFlowFiles(final PersistentProvenanceRepository repo, final File indexDirectory,
+ final String lineageIdentifier, final Collection<String> flowFileUuids) throws IOException {
if (requireNonNull(flowFileUuids).size() > MAX_LINEAGE_UUIDS) {
throw new IllegalArgumentException(String.format("Cannot compute lineage for more than %s FlowFiles. This lineage contains %s.", MAX_LINEAGE_UUIDS, flowFileUuids.size()));
}
@@ -99,7 +100,8 @@ public class LineageQuery {
final DocsReader docsReader = new DocsReader(repo.getConfiguration().getStorageDirectories());
final Set<ProvenanceEventRecord> recs = docsReader.read(uuidQueryTopDocs, indexReader, repo.getAllLogFiles(), new AtomicInteger(0), Integer.MAX_VALUE);
final long readDocsEnd = System.nanoTime();
- logger.debug("Finished Lineage Query; Lucene search took {} millis, reading records took {} millis", TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd));
+ logger.debug("Finished Lineage Query; Lucene search took {} millis, reading records took {} millis",
+ TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd));
return recs;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
index 59dc10b..c622ea1 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
@@ -78,16 +78,16 @@ public class LuceneUtil {
final String searchString = baseName + ".";
for (final Path path : allProvenanceLogs) {
if (path.toFile().getName().startsWith(searchString)) {
- final File file = path.toFile();
- if ( file.exists() ) {
- matchingFiles.add(file);
- } else {
- final File dir = file.getParentFile();
- final File gzFile = new File(dir, file.getName() + ".gz");
- if ( gzFile.exists() ) {
- matchingFiles.add(gzFile);
- }
- }
+ final File file = path.toFile();
+ if ( file.exists() ) {
+ matchingFiles.add(file);
+ } else {
+ final File dir = file.getParentFile();
+ final File gzFile = new File(dir, file.getName() + ".gz");
+ if ( gzFile.exists() ) {
+ matchingFiles.add(gzFile);
+ }
+ }
}
}
@@ -144,16 +144,16 @@ public class LuceneUtil {
final IndexableField fileOffset1 = o1.getField(FieldNames.BLOCK_INDEX);
final IndexableField fileOffset2 = o1.getField(FieldNames.BLOCK_INDEX);
if ( fileOffset1 != null && fileOffset2 != null ) {
- final int blockIndexResult = Long.compare(fileOffset1.numericValue().longValue(), fileOffset2.numericValue().longValue());
- if ( blockIndexResult != 0 ) {
- return blockIndexResult;
- }
-
- final long eventId1 = o1.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
- final long eventId2 = o2.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
- return Long.compare(eventId1, eventId2);
+ final int blockIndexResult = Long.compare(fileOffset1.numericValue().longValue(), fileOffset2.numericValue().longValue());
+ if ( blockIndexResult != 0 ) {
+ return blockIndexResult;
+ }
+
+ final long eventId1 = o1.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
+ final long eventId2 = o2.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
+ return Long.compare(eventId1, eventId2);
}
-
+
final long offset1 = o1.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
final long offset2 = o2.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
return Long.compare(offset1, offset2);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/CompressionAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/CompressionAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/CompressionAction.java
deleted file mode 100644
index d014618..0000000
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/CompressionAction.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance.rollover;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.nifi.stream.io.GZIPOutputStream;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.provenance.lucene.IndexingAction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class CompressionAction implements RolloverAction {
-
- private static final Logger logger = LoggerFactory.getLogger(IndexingAction.class);
-
- @Override
- public File execute(final File fileRolledOver) throws IOException {
- final File gzFile = new File(fileRolledOver.getParent(), fileRolledOver.getName() + ".gz");
- try (final FileInputStream in = new FileInputStream(fileRolledOver);
- final OutputStream fos = new FileOutputStream(gzFile);
- final GZIPOutputStream gzipOut = new GZIPOutputStream(fos, 1)) {
- StreamUtils.copy(in, gzipOut);
- in.getFD().sync();
- }
-
- boolean deleted = false;
- for (int i = 0; i < 10 && !deleted; i++) {
- deleted = fileRolledOver.delete();
- }
-
- logger.info("Finished compressing Provenance Log File {}", fileRolledOver);
- return gzFile;
- }
-
- @Override
- public boolean hasBeenPerformed(final File fileRolledOver) {
- return fileRolledOver.getName().contains(".gz");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/RolloverAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/RolloverAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/RolloverAction.java
deleted file mode 100644
index 33401e9..0000000
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/RolloverAction.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance.rollover;
-
-import java.io.File;
-import java.io.IOException;
-
-public interface RolloverAction {
-
- /**
- * Performs some action against the given File and returns the new File that
- * contains the modified version
- *
- * @param fileRolledOver
- * @return
- * @throws IOException
- */
- File execute(File fileRolledOver) throws IOException;
-
- boolean hasBeenPerformed(File fileRolledOver);
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
index 8bdc88a..91c8222 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
@@ -24,75 +24,80 @@ import org.apache.nifi.provenance.toc.TocReader;
public interface RecordReader extends Closeable {
- /**
- * Returns the next record in the reader, or <code>null</code> if there is no more data available.
- * @return
- * @throws IOException
- */
+ /**
+ * Returns the next record in the reader, or <code>null</code> if there is no more data available.
+ * @return the next Provenance event in the stream
+ * @throws IOException if unable to read the next event from the stream
+ */
StandardProvenanceEventRecord nextRecord() throws IOException;
/**
* Skips the specified number of bytes
- * @param bytesToSkip
- * @throws IOException
+ * @param bytesToSkip the number of bytes to skip ahead
+ * @throws IOException if unable to skip ahead the specified number of bytes (e.g., the stream does
+ * not contain this many more bytes)
*/
void skip(long bytesToSkip) throws IOException;
/**
* Skips to the specified byte offset in the underlying stream.
- * @param position
+ * @param position the byte offset to skip to
* @throws IOException if the underlying stream throws IOException, or if the reader has already
* passed the specified byte offset
*/
void skipTo(long position) throws IOException;
-
+
/**
* Skips to the specified compression block
- *
- * @param blockIndex
+ *
+ * @param blockIndex the byte index to skip to
* @throws IOException if the underlying stream throws IOException, or if the reader has already
* read passed the specified compression block index
* @throws IllegalStateException if the RecordReader does not have a TableOfContents associated with it
*/
void skipToBlock(int blockIndex) throws IOException;
-
+
/**
* Returns the block index that the Reader is currently reading from.
* Note that the block index is incremented at the beginning of the {@link #nextRecord()}
- * method. This means that this method will return the block from which the previous record was read,
+ * method. This means that this method will return the block from which the previous record was read,
* if calling {@link #nextRecord()} continually, not the block from which the next record will be read.
- * @return
+ *
+ * @return the current block index
+ * @throws IllegalStateException if the reader is reading a provenance event file that does not contain
+ * a Table of Contents
*/
int getBlockIndex();
-
+
/**
* Returns <code>true</code> if the compression block index is available. It will be available
* if and only if the reader is created with a TableOfContents
- *
- * @return
+ *
+ * @return true if the reader is reading from an event file that has a Table of Contents
*/
boolean isBlockIndexAvailable();
-
+
/**
* Returns the {@link TocReader} that is used to keep track of compression blocks, if one exists,
* <code>null</code> otherwise
- * @return
+ *
+ * @return the TocReader if the underlying event file has an Table of Contents, <code>null</code> otherwise.
*/
TocReader getTocReader();
-
+
/**
- * Returns the number of bytes that have been consumed from the stream (read or skipped).
- * @return
+ * @return the number of bytes that have been consumed from the stream (read or skipped).
*/
long getBytesConsumed();
-
+
/**
* Returns the ID of the last event in this record reader, or -1 if the reader has no records or
* has already read through all records. Note: This method will consume the stream until the end,
* so no more records will be available on this reader after calling this method.
- *
- * @return
- * @throws IOException
+ *
+ * @return the ID of the last event in this record reader, or -1 if the reader has no records or
+ * has already read through all records
+ * @throws IOException if unable to get id of the last event
*/
long getMaxEventId() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
index dff281c..cab5e6f 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
@@ -37,75 +37,75 @@ public class RecordReaders {
InputStream fis = null;
try {
- if (!file.exists()) {
- if (provenanceLogFiles != null) {
- final String baseName = LuceneUtil.substringBefore(file.getName(), ".") + ".";
- for (final Path path : provenanceLogFiles) {
- if (path.toFile().getName().startsWith(baseName)) {
- file = path.toFile();
- break;
- }
- }
- }
- }
-
- if ( file.exists() ) {
- try {
- fis = new FileInputStream(file);
- } catch (final FileNotFoundException fnfe) {
- fis = null;
- }
- }
-
- String filename = file.getName();
- openStream: while ( fis == null ) {
- final File dir = file.getParentFile();
- final String baseName = LuceneUtil.substringBefore(file.getName(), ".");
-
- // depending on which rollover actions have occurred, we could have 3 possibilities for the
- // filename that we need. The majority of the time, we will use the extension ".prov.indexed.gz"
- // because most often we are compressing on rollover and most often we have already finished
- // compressing by the time that we are querying the data.
- for ( final String extension : new String[] {".prov.gz", ".prov"} ) {
- file = new File(dir, baseName + extension);
- if ( file.exists() ) {
- try {
- fis = new FileInputStream(file);
- filename = baseName + extension;
- break openStream;
- } catch (final FileNotFoundException fnfe) {
- // file was modified by a RolloverAction after we verified that it exists but before we could
- // create an InputStream for it. Start over.
- fis = null;
- continue openStream;
- }
- }
- }
-
- break;
- }
-
- if ( fis == null ) {
- throw new FileNotFoundException("Unable to locate file " + originalFile);
- }
-
- final File tocFile = TocUtil.getTocFile(file);
- if ( tocFile.exists() ) {
- final TocReader tocReader = new StandardTocReader(tocFile);
- return new StandardRecordReader(fis, filename, tocReader);
- } else {
- return new StandardRecordReader(fis, filename);
- }
+ if (!file.exists()) {
+ if (provenanceLogFiles != null) {
+ final String baseName = LuceneUtil.substringBefore(file.getName(), ".") + ".";
+ for (final Path path : provenanceLogFiles) {
+ if (path.toFile().getName().startsWith(baseName)) {
+ file = path.toFile();
+ break;
+ }
+ }
+ }
+ }
+
+ if ( file.exists() ) {
+ try {
+ fis = new FileInputStream(file);
+ } catch (final FileNotFoundException fnfe) {
+ fis = null;
+ }
+ }
+
+ String filename = file.getName();
+ openStream: while ( fis == null ) {
+ final File dir = file.getParentFile();
+ final String baseName = LuceneUtil.substringBefore(file.getName(), ".");
+
+ // depending on which rollover actions have occurred, we could have 3 possibilities for the
+ // filename that we need. The majority of the time, we will use the extension ".prov.indexed.gz"
+ // because most often we are compressing on rollover and most often we have already finished
+ // compressing by the time that we are querying the data.
+ for ( final String extension : new String[] {".prov.gz", ".prov"} ) {
+ file = new File(dir, baseName + extension);
+ if ( file.exists() ) {
+ try {
+ fis = new FileInputStream(file);
+ filename = baseName + extension;
+ break openStream;
+ } catch (final FileNotFoundException fnfe) {
+ // file was modified by a RolloverAction after we verified that it exists but before we could
+ // create an InputStream for it. Start over.
+ fis = null;
+ continue openStream;
+ }
+ }
+ }
+
+ break;
+ }
+
+ if ( fis == null ) {
+ throw new FileNotFoundException("Unable to locate file " + originalFile);
+ }
+
+ final File tocFile = TocUtil.getTocFile(file);
+ if ( tocFile.exists() ) {
+ final TocReader tocReader = new StandardTocReader(tocFile);
+ return new StandardRecordReader(fis, filename, tocReader);
+ } else {
+ return new StandardRecordReader(fis, filename);
+ }
} catch (final IOException ioe) {
- if ( fis != null ) {
- try {
- fis.close();
- } catch (final IOException inner) {
- ioe.addSuppressed(inner);
- }
- }
-
- throw ioe;
+ if ( fis != null ) {
+ try {
+ fis.close();
+ } catch (final IOException inner) {
+ ioe.addSuppressed(inner);
+ }
+ }
+
+ throw ioe;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
index 58f4dc2..d89fd6f 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
@@ -28,31 +28,27 @@ public interface RecordWriter extends Closeable {
/**
* Writes header information to the underlying stream
*
- * @throws IOException
+ * @throws IOException if unable to write header information to the underlying stream
*/
void writeHeader() throws IOException;
/**
* Writes the given record out to the underlying stream
*
- * @param record
- * @param recordIdentifier
+ * @param record the record to write
+ * @param recordIdentifier the new identifier of the record
* @return the number of bytes written for the given records
- * @throws IOException
+ * @throws IOException if unable to write the record to the stream
*/
long writeRecord(ProvenanceEventRecord record, long recordIdentifier) throws IOException;
/**
- * Returns the number of Records that have been written to this RecordWriter
- *
- * @return
+ * @return the number of Records that have been written to this RecordWriter
*/
int getRecordsWritten();
/**
- * Returns the file that this RecordWriter is writing to
- *
- * @return
+ * @return the file that this RecordWriter is writing to
*/
File getFile();
@@ -73,19 +69,18 @@ public interface RecordWriter extends Closeable {
* not immediately available, returns <code>false</code>; otherwise, obtains
* the lock and returns <code>true</code>.
*
- * @return
+ * @return <code>true</code> if the lock was obtained, <code>false</code> otherwise.
*/
boolean tryLock();
/**
* Syncs the content written to this writer to disk.
- * @throws java.io.IOException
+ * @throws IOException if unable to sync content to disk
*/
void sync() throws IOException;
/**
- * Returns the TOC Writer that is being used to write the Table of Contents for this journal
- * @return
+ * @return the TOC Writer that is being used to write the Table of Contents for this journal
*/
TocWriter getTocWriter();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
index 47b7c7e..cf8f7b4 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
@@ -25,14 +25,14 @@ import org.apache.nifi.provenance.toc.TocUtil;
import org.apache.nifi.provenance.toc.TocWriter;
public class RecordWriters {
- private static final int DEFAULT_COMPRESSION_BLOCK_SIZE = 1024 * 1024; // 1 MB
+ private static final int DEFAULT_COMPRESSION_BLOCK_SIZE = 1024 * 1024; // 1 MB
public static RecordWriter newRecordWriter(final File file, final boolean compressed, final boolean createToc) throws IOException {
- return newRecordWriter(file, compressed, createToc, DEFAULT_COMPRESSION_BLOCK_SIZE);
+ return newRecordWriter(file, compressed, createToc, DEFAULT_COMPRESSION_BLOCK_SIZE);
}
-
+
public static RecordWriter newRecordWriter(final File file, final boolean compressed, final boolean createToc, final int compressionBlockBytes) throws IOException {
- final TocWriter tocWriter = createToc ? new StandardTocWriter(TocUtil.getTocFile(file), false, false) : null;
+ final TocWriter tocWriter = createToc ? new StandardTocWriter(TocUtil.getTocFile(file), false, false) : null;
return new StandardRecordWriter(file, tocWriter, compressed, compressionBlockBytes);
}