You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/04/28 16:04:54 UTC
[20/50] [abbrv] incubator-nifi git commit: NIFI-527: Code cleanup
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
index 8944cec..7c13a2a 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
@@ -24,9 +24,9 @@ import java.io.IOException;
/**
* Standard implementation of TocReader.
- *
+ *
* Expects .toc file to be in the following format;
- *
+ *
* byte 0: version
* byte 1: boolean: compressionFlag -> 0 = journal is NOT compressed, 1 = journal is compressed
* byte 2-9: long: offset of block 0
@@ -37,21 +37,21 @@ import java.io.IOException;
public class StandardTocReader implements TocReader {
private final boolean compressed;
private final long[] offsets;
-
+
public StandardTocReader(final File file) throws IOException {
try (final FileInputStream fis = new FileInputStream(file);
- final DataInputStream dis = new DataInputStream(fis)) {
-
+ final DataInputStream dis = new DataInputStream(fis)) {
+
final int version = dis.read();
if ( version < 0 ) {
throw new EOFException();
}
-
+
final int compressionFlag = dis.read();
if ( compressionFlag < 0 ) {
throw new EOFException();
}
-
+
if ( compressionFlag == 0 ) {
compressed = false;
} else if ( compressionFlag == 1 ) {
@@ -59,21 +59,21 @@ public class StandardTocReader implements TocReader {
} else {
throw new IOException("Table of Contents appears to be corrupt: could not read 'compression flag' from header; expected value of 0 or 1 but got " + compressionFlag);
}
-
+
final int numBlocks = (int) ((file.length() - 2) / 8);
offsets = new long[numBlocks];
-
+
for (int i=0; i < numBlocks; i++) {
offsets[i] = dis.readLong();
}
}
}
-
+
@Override
public boolean isCompressed() {
return compressed;
}
-
+
@Override
public long getBlockOffset(final int blockIndex) {
if ( blockIndex >= offsets.length ) {
@@ -89,20 +89,20 @@ public class StandardTocReader implements TocReader {
}
return offsets[offsets.length - 1];
}
-
+
@Override
public void close() throws IOException {
}
- @Override
- public int getBlockIndex(final long blockOffset) {
- for (int i=0; i < offsets.length; i++) {
- if ( offsets[i] > blockOffset ) {
- return i-1;
- }
- }
-
- return offsets.length - 1;
- }
+ @Override
+ public int getBlockIndex(final long blockOffset) {
+ for (int i=0; i < offsets.length; i++) {
+ if ( offsets[i] > blockOffset ) {
+ return i-1;
+ }
+ }
+
+ return offsets.length - 1;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
index 488f225..10de459 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
@@ -19,7 +19,6 @@ package org.apache.nifi.provenance.toc;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
@@ -29,7 +28,7 @@ import org.slf4j.LoggerFactory;
/**
* Standard implementation of {@link TocWriter}.
- *
+ *
* Format of .toc file:
* byte 0: version
* byte 1: compressed: 0 -> not compressed, 1 -> compressed
@@ -39,27 +38,27 @@ import org.slf4j.LoggerFactory;
* byte (N*8+2)-(N*8+9): long: offset of block N
*/
public class StandardTocWriter implements TocWriter {
- private static final Logger logger = LoggerFactory.getLogger(StandardTocWriter.class);
-
+ private static final Logger logger = LoggerFactory.getLogger(StandardTocWriter.class);
+
public static final byte VERSION = 1;
-
+
private final File file;
private final FileOutputStream fos;
private final boolean alwaysSync;
private int index = -1;
-
+
/**
* Creates a StandardTocWriter that writes to the given file.
* @param file the file to write to
* @param compressionFlag whether or not the journal is compressed
- * @throws FileNotFoundException
+ * @throws IOException if unable to write header info to the specified file
*/
public StandardTocWriter(final File file, final boolean compressionFlag, final boolean alwaysSync) throws IOException {
final File tocDir = file.getParentFile();
if ( !tocDir.exists() ) {
- Files.createDirectories(tocDir.toPath());
+ Files.createDirectories(tocDir.toPath());
}
-
+
this.file = file;
fos = new FileOutputStream(file);
this.alwaysSync = alwaysSync;
@@ -69,12 +68,12 @@ public class StandardTocWriter implements TocWriter {
header[1] = (byte) (compressionFlag ? 1 : 0);
fos.write(header);
fos.flush();
-
+
if ( alwaysSync ) {
sync();
}
}
-
+
@Override
public void addBlockOffset(final long offset) throws IOException {
final BufferedOutputStream bos = new BufferedOutputStream(fos);
@@ -83,17 +82,17 @@ public class StandardTocWriter implements TocWriter {
dos.flush();
index++;
logger.debug("Adding block {} at offset {}", index, offset);
-
+
if ( alwaysSync ) {
sync();
}
}
-
+
@Override
public void sync() throws IOException {
- fos.getFD().sync();
+ fos.getFD().sync();
}
-
+
@Override
public int getCurrentBlockIndex() {
return index;
@@ -104,15 +103,15 @@ public class StandardTocWriter implements TocWriter {
if (alwaysSync) {
fos.getFD().sync();
}
-
+
fos.close();
}
-
+
@Override
public File getFile() {
return file;
}
-
+
@Override
public String toString() {
return "TOC Writer for " + file;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java
index 7c197be..97e2838 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java
@@ -32,27 +32,31 @@ public interface TocReader extends Closeable {
/**
* Indicates whether or not the corresponding Journal file is compressed
- * @return
+ * @return <code>true</code> if the event file is compressed
*/
boolean isCompressed();
/**
* Returns the byte offset into the Journal File for the Block with the given index.
- * @param blockIndex
- * @return
+ *
+ * @param blockIndex the block index to get the byte offset for
+ * @return the byte offset for the given block index, or <code>-1</code> if the given block index
+ * does not exist
*/
long getBlockOffset(int blockIndex);
-
+
/**
* Returns the byte offset into the Journal File of the last Block in the given index
- * @return
+ * @return the byte offset into the Journal File of the last Block in the given index
*/
long getLastBlockOffset();
-
+
/**
* Returns the index of the block that contains the given offset
- * @param blockOffset
- * @return
+ *
+ * @param blockOffset the byte offset for which the block index is desired
+ *
+ * @return the index of the block that contains the given offset
*/
int getBlockIndex(long blockOffset);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java
index c30ac98..3fa7d67 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java
@@ -22,16 +22,19 @@ import org.apache.nifi.provenance.lucene.LuceneUtil;
public class TocUtil {
- /**
- * Returns the file that should be used as the Table of Contents for the given Journal File
- * @param journalFile
- * @return
- */
- public static File getTocFile(final File journalFile) {
- final File tocDir = new File(journalFile.getParentFile(), "toc");
- final String basename = LuceneUtil.substringBefore(journalFile.getName(), ".");
- final File tocFile = new File(tocDir, basename + ".toc");
- return tocFile;
- }
-
+ /**
+ * Returns the file that should be used as the Table of Contents for the given Journal File.
+ * Note, if no TOC exists for the given Journal File, a File will still be returned but the file
+ * will not actually exist.
+ *
+ * @param journalFile the journal file for which to get the Table of Contents
+ * @return the file that represents the Table of Contents for the specified journal file.
+ */
+ public static File getTocFile(final File journalFile) {
+ final File tocDir = new File(journalFile.getParentFile(), "toc");
+ final String basename = LuceneUtil.substringBefore(journalFile.getName(), ".");
+ final File tocFile = new File(tocDir, basename + ".toc");
+ return tocFile;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java
index c678053..38f910f 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java
@@ -27,26 +27,24 @@ public interface TocWriter extends Closeable {
/**
* Adds the given block offset as the next Block Offset in the Table of Contents
- * @param offset
- * @throws IOException
+ * @param offset the byte offset at which the block begins
+ * @throws IOException if unable to persist the block index
*/
void addBlockOffset(long offset) throws IOException;
-
+
/**
- * Returns the index of the current Block
- * @return
+ * @return the index of the current Block
*/
int getCurrentBlockIndex();
-
+
/**
- * Returns the file that is currently being written to
- * @return
+ * @return the file that is currently being written to
*/
File getFile();
/**
* Synchronizes the data with the underlying storage device
- * @throws IOException
+ * @throws IOException if unable to synchronize the data with the underlying storage device
*/
void sync() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index 5541ab5..7d97bcd 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -75,7 +75,7 @@ public class TestPersistentProvenanceRepository {
private PersistentProvenanceRepository repo;
private RepositoryConfiguration config;
-
+
public static final int DEFAULT_ROLLOVER_MILLIS = 2000;
private RepositoryConfiguration createConfiguration() {
@@ -89,9 +89,9 @@ public class TestPersistentProvenanceRepository {
@BeforeClass
public static void setLogLevel() {
- System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
+ System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
}
-
+
@Before
public void printTestName() {
System.out.println("\n\n\n*********************** " + name.getMethodName() + " *****************************");
@@ -105,33 +105,33 @@ public class TestPersistentProvenanceRepository {
} catch (final IOException ioe) {
}
}
-
+
// Delete all of the storage files. We do this in order to clean up the tons of files that
// we create but also to ensure that we have closed all of the file handles. If we leave any
// streams open, for instance, this will throw an IOException, causing our unit test to fail.
for ( final File storageDir : config.getStorageDirectories() ) {
- int i;
- for (i=0; i < 3; i++) {
- try {
- FileUtils.deleteFile(storageDir, true);
- break;
- } catch (final IOException ioe) {
- // if there is a virus scanner, etc. running in the background we may not be able to
- // delete the file. Wait a sec and try again.
- if ( i == 2 ) {
- throw ioe;
- } else {
- try {
- Thread.sleep(1000L);
- } catch (final InterruptedException ie) {
- }
- }
- }
- }
+ int i;
+ for (i=0; i < 3; i++) {
+ try {
+ FileUtils.deleteFile(storageDir, true);
+ break;
+ } catch (final IOException ioe) {
+ // if there is a virus scanner, etc. running in the background we may not be able to
+ // delete the file. Wait a sec and try again.
+ if ( i == 2 ) {
+ throw ioe;
+ } else {
+ try {
+ Thread.sleep(1000L);
+ } catch (final InterruptedException ie) {
+ }
+ }
+ }
+ }
}
}
-
+
private EventReporter getEventReporter() {
return new EventReporter() {
@@ -241,7 +241,7 @@ public class TestPersistentProvenanceRepository {
}
Thread.sleep(1000L);
-
+
repo.close();
Thread.sleep(500L); // Give the repo time to shutdown (i.e., close all file handles, etc.)
@@ -431,7 +431,7 @@ public class TestPersistentProvenanceRepository {
repo.waitForRollover();
final Query query = new Query(UUID.randomUUID().toString());
-// query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000000-0000-0000-0000*"));
+ // query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000000-0000-0000-0000*"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4"));
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*"));
@@ -905,14 +905,14 @@ public class TestPersistentProvenanceRepository {
secondRepo.initialize(getEventReporter());
try {
- final ProvenanceEventRecord event11 = builder.build();
- secondRepo.registerEvent(event11);
- secondRepo.waitForRollover();
- final ProvenanceEventRecord event11Retrieved = secondRepo.getEvent(10L);
- assertNotNull(event11Retrieved);
- assertEquals(10, event11Retrieved.getEventId());
+ final ProvenanceEventRecord event11 = builder.build();
+ secondRepo.registerEvent(event11);
+ secondRepo.waitForRollover();
+ final ProvenanceEventRecord event11Retrieved = secondRepo.getEvent(10L);
+ assertNotNull(event11Retrieved);
+ assertEquals(10, event11Retrieved.getEventId());
} finally {
- secondRepo.close();
+ secondRepo.close();
}
}
@@ -983,26 +983,26 @@ public class TestPersistentProvenanceRepository {
storageDirFiles = config.getStorageDirectories().get(0).listFiles(indexFileFilter);
assertEquals(0, storageDirFiles.length);
}
-
-
+
+
@Test
public void testBackPressure() throws IOException, InterruptedException {
final RepositoryConfiguration config = createConfiguration();
- config.setMaxEventFileCapacity(1L); // force rollover on each record.
+ config.setMaxEventFileCapacity(1L); // force rollover on each record.
config.setJournalCount(1);
-
+
final AtomicInteger journalCountRef = new AtomicInteger(0);
-
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
- @Override
- protected int getJournalCount() {
- return journalCountRef.get();
- }
- };
+
+ repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
+ @Override
+ protected int getJournalCount() {
+ return journalCountRef.get();
+ }
+ };
repo.initialize(getEventReporter());
- final Map<String, String> attributes = new HashMap<>();
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+ final Map<String, String> attributes = new HashMap<>();
+ final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
builder.setEventTime(System.currentTimeMillis());
builder.setEventType(ProvenanceEventType.RECEIVE);
builder.setTransitUri("nifi://unit-test");
@@ -1023,31 +1023,31 @@ public class TestPersistentProvenanceRepository {
final AtomicLong threadNanos = new AtomicLong(0L);
final Thread t = new Thread(new Runnable() {
- @Override
- public void run() {
- final long start = System.nanoTime();
- builder.fromFlowFile(createFlowFile(13, 3000L, attributes));
- attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 13);
- repo.registerEvent(builder.build());
- threadNanos.set(System.nanoTime() - start);
- }
+ @Override
+ public void run() {
+ final long start = System.nanoTime();
+ builder.fromFlowFile(createFlowFile(13, 3000L, attributes));
+ attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 13);
+ repo.registerEvent(builder.build());
+ threadNanos.set(System.nanoTime() - start);
+ }
});
t.start();
Thread.sleep(1500L);
-
+
journalCountRef.set(1);
t.join();
-
+
final int threadMillis = (int) TimeUnit.NANOSECONDS.toMillis(threadNanos.get());
- assertTrue(threadMillis > 1200); // use 1200 to account for the fact that the timing is not exact
-
+ assertTrue(threadMillis > 1200); // use 1200 to account for the fact that the timing is not exact
+
builder.fromFlowFile(createFlowFile(15, 3000L, attributes));
attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 15);
repo.registerEvent(builder.build());
}
-
-
+
+
// TODO: test EOF on merge
// TODO: Test journal with no records
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
index 6f85b94..136f244 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
@@ -40,15 +40,15 @@ import org.junit.Test;
public class TestStandardRecordReaderWriter {
@BeforeClass
public static void setLogLevel() {
- System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
+ System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
}
- private ProvenanceEventRecord createEvent() {
- final Map<String, String> attributes = new HashMap<>();
- attributes.put("filename", "1.txt");
+ private ProvenanceEventRecord createEvent() {
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("filename", "1.txt");
attributes.put("uuid", UUID.randomUUID().toString());
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+ final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
builder.setEventTime(System.currentTimeMillis());
builder.setEventType(ProvenanceEventType.RECEIVE);
builder.setTransitUri("nifi://unit-test");
@@ -58,132 +58,132 @@ public class TestStandardRecordReaderWriter {
final ProvenanceEventRecord record = builder.build();
return record;
- }
-
- @Test
- public void testSimpleWriteWithToc() throws IOException {
+ }
+
+ @Test
+ public void testSimpleWriteWithToc() throws IOException {
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite");
final File tocFile = TocUtil.getTocFile(journalFile);
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, false, 1024 * 1024);
-
+
writer.writeHeader();
writer.writeRecord(createEvent(), 1L);
writer.close();
final TocReader tocReader = new StandardTocReader(tocFile);
-
+
try (final FileInputStream fis = new FileInputStream(journalFile);
- final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
- assertEquals(0, reader.getBlockIndex());
- reader.skipToBlock(0);
- StandardProvenanceEventRecord recovered = reader.nextRecord();
- assertNotNull(recovered);
-
- assertEquals("nifi://unit-test", recovered.getTransitUri());
- assertNull(reader.nextRecord());
+ final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+ assertEquals(0, reader.getBlockIndex());
+ reader.skipToBlock(0);
+ StandardProvenanceEventRecord recovered = reader.nextRecord();
+ assertNotNull(recovered);
+
+ assertEquals("nifi://unit-test", recovered.getTransitUri());
+ assertNull(reader.nextRecord());
}
-
+
FileUtils.deleteFile(journalFile.getParentFile(), true);
- }
-
-
- @Test
- public void testSingleRecordCompressed() throws IOException {
+ }
+
+
+ @Test
+ public void testSingleRecordCompressed() throws IOException {
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
final File tocFile = TocUtil.getTocFile(journalFile);
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 100);
-
+
writer.writeHeader();
writer.writeRecord(createEvent(), 1L);
writer.close();
final TocReader tocReader = new StandardTocReader(tocFile);
-
+
try (final FileInputStream fis = new FileInputStream(journalFile);
- final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
- assertEquals(0, reader.getBlockIndex());
- reader.skipToBlock(0);
- StandardProvenanceEventRecord recovered = reader.nextRecord();
- assertNotNull(recovered);
-
- assertEquals("nifi://unit-test", recovered.getTransitUri());
- assertNull(reader.nextRecord());
+ final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+ assertEquals(0, reader.getBlockIndex());
+ reader.skipToBlock(0);
+ StandardProvenanceEventRecord recovered = reader.nextRecord();
+ assertNotNull(recovered);
+
+ assertEquals("nifi://unit-test", recovered.getTransitUri());
+ assertNull(reader.nextRecord());
}
-
+
FileUtils.deleteFile(journalFile.getParentFile(), true);
- }
-
-
- @Test
- public void testMultipleRecordsSameBlockCompressed() throws IOException {
+ }
+
+
+ @Test
+ public void testMultipleRecordsSameBlockCompressed() throws IOException {
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
final File tocFile = TocUtil.getTocFile(journalFile);
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
// new record each 1 MB of uncompressed data
final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 1024 * 1024);
-
+
writer.writeHeader();
for (int i=0; i < 10; i++) {
- writer.writeRecord(createEvent(), i);
+ writer.writeRecord(createEvent(), i);
}
writer.close();
final TocReader tocReader = new StandardTocReader(tocFile);
-
+
try (final FileInputStream fis = new FileInputStream(journalFile);
- final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
- for (int i=0; i < 10; i++) {
- assertEquals(0, reader.getBlockIndex());
-
- // call skipToBlock half the time to ensure that we can; avoid calling it
- // the other half of the time to ensure that it's okay.
- if (i <= 5) {
- reader.skipToBlock(0);
- }
-
- StandardProvenanceEventRecord recovered = reader.nextRecord();
- assertNotNull(recovered);
- assertEquals("nifi://unit-test", recovered.getTransitUri());
- }
-
- assertNull(reader.nextRecord());
+ final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+ for (int i=0; i < 10; i++) {
+ assertEquals(0, reader.getBlockIndex());
+
+ // call skipToBlock half the time to ensure that we can; avoid calling it
+ // the other half of the time to ensure that it's okay.
+ if (i <= 5) {
+ reader.skipToBlock(0);
+ }
+
+ StandardProvenanceEventRecord recovered = reader.nextRecord();
+ assertNotNull(recovered);
+ assertEquals("nifi://unit-test", recovered.getTransitUri());
+ }
+
+ assertNull(reader.nextRecord());
}
-
+
FileUtils.deleteFile(journalFile.getParentFile(), true);
- }
-
-
- @Test
- public void testMultipleRecordsMultipleBlocksCompressed() throws IOException {
+ }
+
+
+ @Test
+ public void testMultipleRecordsMultipleBlocksCompressed() throws IOException {
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
final File tocFile = TocUtil.getTocFile(journalFile);
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
// new block each 10 bytes
final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 100);
-
+
writer.writeHeader();
for (int i=0; i < 10; i++) {
- writer.writeRecord(createEvent(), i);
+ writer.writeRecord(createEvent(), i);
}
writer.close();
final TocReader tocReader = new StandardTocReader(tocFile);
-
+
try (final FileInputStream fis = new FileInputStream(journalFile);
- final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
- for (int i=0; i < 10; i++) {
- StandardProvenanceEventRecord recovered = reader.nextRecord();
- System.out.println(recovered);
- assertNotNull(recovered);
- assertEquals((long) i, recovered.getEventId());
- assertEquals("nifi://unit-test", recovered.getTransitUri());
- }
-
- assertNull(reader.nextRecord());
+ final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+ for (int i=0; i < 10; i++) {
+ StandardProvenanceEventRecord recovered = reader.nextRecord();
+ System.out.println(recovered);
+ assertNotNull(recovered);
+ assertEquals((long) i, recovered.getEventId());
+ assertEquals("nifi://unit-test", recovered.getTransitUri());
+ }
+
+ assertNull(reader.nextRecord());
}
-
+
FileUtils.deleteFile(journalFile.getParentFile(), true);
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
index 7459fe8..eb0f736 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
@@ -24,7 +24,7 @@ import java.util.Set;
import org.apache.nifi.flowfile.FlowFile;
public class TestUtil {
- public static FlowFile createFlowFile(final long id, final long fileSize, final Map<String, String> attributes) {
+ public static FlowFile createFlowFile(final long id, final long fileSize, final Map<String, String> attributes) {
final Map<String, String> attrCopy = new HashMap<>(attributes);
return new FlowFile() {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java
index 30326e7..87400a0 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java
@@ -38,7 +38,7 @@ public class TestStandardTocReader {
out.write(0);
out.write(0);
}
-
+
try {
try(final StandardTocReader reader = new StandardTocReader(file)) {
assertFalse(reader.isCompressed());
@@ -46,13 +46,13 @@ public class TestStandardTocReader {
} finally {
file.delete();
}
-
-
+
+
try (final OutputStream out = new FileOutputStream(file)) {
out.write(0);
out.write(1);
}
-
+
try {
try(final StandardTocReader reader = new StandardTocReader(file)) {
assertTrue(reader.isCompressed());
@@ -61,25 +61,25 @@ public class TestStandardTocReader {
file.delete();
}
}
-
-
+
+
@Test
public void testGetBlockIndex() throws IOException {
final File file = new File("target/" + UUID.randomUUID().toString());
try (final OutputStream out = new FileOutputStream(file);
- final DataOutputStream dos = new DataOutputStream(out)) {
+ final DataOutputStream dos = new DataOutputStream(out)) {
out.write(0);
out.write(0);
-
+
for (int i=0; i < 1024; i++) {
dos.writeLong(i * 1024L);
}
}
-
+
try {
try(final StandardTocReader reader = new StandardTocReader(file)) {
assertFalse(reader.isCompressed());
-
+
for (int i=0; i < 1024; i++) {
assertEquals(i * 1024, reader.getBlockOffset(i));
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java
index 70f55a2..aebe0d5 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java
@@ -31,12 +31,12 @@ public class TestStandardTocWriter {
final File tocFile = new File("target/" + UUID.randomUUID().toString() + ".toc");
try {
assertTrue( tocFile.createNewFile() );
-
+
try (final StandardTocWriter writer = new StandardTocWriter(tocFile, false, false)) {
}
} finally {
FileUtils.deleteFile(tocFile, false);
}
}
-
+
}