You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/04/27 18:04:46 UTC
[01/10] incubator-nifi git commit: NIFI-271: Added additional test
resources to RAT exclusions
Repository: incubator-nifi
Updated Branches:
refs/heads/develop d29a2d688 -> 666de3d41
NIFI-271: Added additional test resources to RAT exclusions
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/b097a530
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/b097a530
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/b097a530
Branch: refs/heads/develop
Commit: b097a530760398d18354ff87ba4bf17bfa384899
Parents: 953d9e5
Author: Mark Payne <ma...@hotmail.com>
Authored: Sat Apr 25 12:01:45 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Sat Apr 25 12:01:45 2015 -0400
----------------------------------------------------------------------
.../nifi-framework/nifi-framework-core/pom.xml | 1 +
.../nifi-standard-bundle/nifi-standard-processors/pom.xml | 3 +++
2 files changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b097a530/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index a5054e4..25c396f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -131,6 +131,7 @@
<exclude>src/test/resources/conf/0bytes.xml</exclude>
<exclude>src/test/resources/conf/termination-only.xml</exclude>
<exclude>src/test/resources/hello.txt</exclude>
+ <exclude>src/test/resources/old-swap-file.swap</exclude>
</excludes>
</configuration>
</plugin>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b097a530/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index c3c05df..be0fc67 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -176,6 +176,7 @@
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
+ <exclude>src/test/resources/localhost.cer</exclude>
<exclude>src/test/resources/hello.txt</exclude>
<exclude>src/test/resources/CharacterSetConversionSamples/Converted.txt</exclude>
<exclude>src/test/resources/CharacterSetConversionSamples/Original.txt</exclude>
@@ -231,6 +232,8 @@
<exclude>src/test/resources/TestTransformXml/tokens.xml</exclude>
<exclude>src/test/resources/TestUnpackContent/folder/cal.txt</exclude>
<exclude>src/test/resources/TestUnpackContent/folder/date.txt</exclude>
+ <exclude>src/test/resources/TestUnpackContent/data.flowfilev2</exclude>
+ <exclude>src/test/resources/TestUnpackContent/data.flowfilev3</exclude>
<exclude>src/test/resources/TestXml/xml-bundle-1</exclude>
<exclude>src/test/resources/CompressedData/SampleFile.txt.bz2</exclude>
<exclude>src/test/resources/CompressedData/SampleFile.txt.gz</exclude>
[05/10] incubator-nifi git commit: NIFI-527: Merging develop
Posted by ma...@apache.org.
NIFI-527: Merging develop
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a5ac48a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a5ac48a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a5ac48a0
Branch: refs/heads/develop
Commit: a5ac48a03c362dcb0b253741157d79e8791eb2d5
Parents: f442d55
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Apr 27 09:52:33 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Apr 27 09:52:33 2015 -0400
----------------------------------------------------------------------
.../PersistentProvenanceRepository.java | 592 ++++++++++---------
.../provenance/RepositoryConfiguration.java | 14 +-
.../nifi/provenance/StandardRecordReader.java | 223 ++++++-
.../nifi/provenance/StandardRecordWriter.java | 114 +++-
.../provenance/lucene/DeleteIndexAction.java | 75 +--
.../nifi/provenance/lucene/DocsReader.java | 100 +++-
.../nifi/provenance/lucene/FieldNames.java | 1 +
.../nifi/provenance/lucene/IndexManager.java | 467 +++++++++++++++
.../nifi/provenance/lucene/IndexSearch.java | 71 ++-
.../nifi/provenance/lucene/IndexingAction.java | 183 +++---
.../nifi/provenance/lucene/LuceneUtil.java | 26 +-
.../provenance/serialization/RecordReader.java | 67 +++
.../provenance/serialization/RecordReaders.java | 139 ++---
.../provenance/serialization/RecordWriter.java | 6 +
.../provenance/serialization/RecordWriters.java | 13 +-
.../nifi/provenance/toc/StandardTocReader.java | 108 ++++
.../nifi/provenance/toc/StandardTocWriter.java | 120 ++++
.../apache/nifi/provenance/toc/TocReader.java | 58 ++
.../org/apache/nifi/provenance/toc/TocUtil.java | 37 ++
.../apache/nifi/provenance/toc/TocWriter.java | 52 ++
.../TestPersistentProvenanceRepository.java | 175 +++---
.../TestStandardRecordReaderWriter.java | 189 ++++++
.../org/apache/nifi/provenance/TestUtil.java | 82 +++
.../provenance/toc/TestStandardTocReader.java | 91 +++
.../provenance/toc/TestStandardTocWriter.java | 42 ++
25 files changed, 2426 insertions(+), 619 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 0502cc7..48cc164 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -21,7 +21,6 @@ import java.io.File;
import java.io.FileFilter;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -58,6 +57,14 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexNotFoundException;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.FSDirectory;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.provenance.expiration.ExpirationAction;
@@ -67,12 +74,11 @@ import org.apache.nifi.provenance.lineage.Lineage;
import org.apache.nifi.provenance.lineage.LineageComputationType;
import org.apache.nifi.provenance.lucene.DeleteIndexAction;
import org.apache.nifi.provenance.lucene.FieldNames;
+import org.apache.nifi.provenance.lucene.IndexManager;
import org.apache.nifi.provenance.lucene.IndexSearch;
import org.apache.nifi.provenance.lucene.IndexingAction;
import org.apache.nifi.provenance.lucene.LineageQuery;
import org.apache.nifi.provenance.lucene.LuceneUtil;
-import org.apache.nifi.provenance.rollover.CompressionAction;
-import org.apache.nifi.provenance.rollover.RolloverAction;
import org.apache.nifi.provenance.search.Query;
import org.apache.nifi.provenance.search.QueryResult;
import org.apache.nifi.provenance.search.QuerySubmission;
@@ -81,18 +87,12 @@ import org.apache.nifi.provenance.serialization.RecordReader;
import org.apache.nifi.provenance.serialization.RecordReaders;
import org.apache.nifi.provenance.serialization.RecordWriter;
import org.apache.nifi.provenance.serialization.RecordWriters;
+import org.apache.nifi.provenance.toc.TocUtil;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.RingBuffer;
import org.apache.nifi.util.StopWatch;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexNotFoundException;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.ScoreDoc;
-import org.apache.lucene.search.TopDocs;
-import org.apache.lucene.store.FSDirectory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -102,7 +102,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
public static final String EVENT_CATEGORY = "Provenance Repository";
private static final String FILE_EXTENSION = ".prov";
private static final String TEMP_FILE_SUFFIX = ".prov.part";
- public static final int SERIALIZATION_VERSION = 7;
+ public static final int SERIALIZATION_VERSION = 8;
public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
public static final Pattern INDEX_PATTERN = Pattern.compile("index-\\d+");
public static final Pattern LOG_FILENAME_PATTERN = Pattern.compile("(\\d+).*\\.prov");
@@ -129,14 +129,14 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
private final AtomicLong streamStartTime = new AtomicLong(System.currentTimeMillis());
private final RepositoryConfiguration configuration;
private final IndexConfiguration indexConfig;
+ private final IndexManager indexManager;
private final boolean alwaysSync;
private final int rolloverCheckMillis;
private final ScheduledExecutorService scheduledExecService;
- private final ExecutorService rolloverExecutor;
+ private final ScheduledExecutorService rolloverExecutor;
private final ExecutorService queryExecService;
- private final List<RolloverAction> rolloverActions = new ArrayList<>();
private final List<ExpirationAction> expirationActions = new ArrayList<>();
private final IndexingAction indexingAction;
@@ -181,22 +181,18 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
this.maxPartitionMillis = configuration.getMaxEventFileLife(TimeUnit.MILLISECONDS);
this.maxPartitionBytes = configuration.getMaxEventFileCapacity();
this.indexConfig = new IndexConfiguration(configuration);
+ this.indexManager = new IndexManager();
this.alwaysSync = configuration.isAlwaysSync();
this.rolloverCheckMillis = rolloverCheckMillis;
final List<SearchableField> fields = configuration.getSearchableFields();
if (fields != null && !fields.isEmpty()) {
indexingAction = new IndexingAction(this, indexConfig);
- rolloverActions.add(indexingAction);
} else {
indexingAction = null;
}
- if (configuration.isCompressOnRollover()) {
- rolloverActions.add(new CompressionAction());
- }
-
- scheduledExecService = Executors.newScheduledThreadPool(3);
+ scheduledExecService = Executors.newScheduledThreadPool(3, new NamedThreadFactory("Provenance Maintenance Thread"));
queryExecService = Executors.newFixedThreadPool(configuration.getQueryThreadPoolSize(), new NamedThreadFactory("Provenance Query Thread"));
// The number of rollover threads is a little bit arbitrary but comes from the idea that multiple storage directories generally
@@ -204,69 +200,74 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
// disks efficiently. However, the rollover actions can be somewhat CPU intensive, so we double the number of threads in order
// to account for that.
final int numRolloverThreads = configuration.getStorageDirectories().size() * 2;
- rolloverExecutor = Executors.newFixedThreadPool(numRolloverThreads, new NamedThreadFactory("Provenance Repository Rollover Thread"));
+ rolloverExecutor = Executors.newScheduledThreadPool(numRolloverThreads, new NamedThreadFactory("Provenance Repository Rollover Thread"));
}
@Override
public void initialize(final EventReporter eventReporter) throws IOException {
- if (initialized.getAndSet(true)) {
- return;
- }
-
- this.eventReporter = eventReporter;
-
- recover();
-
- if (configuration.isAllowRollover()) {
- writers = createWriters(configuration, idGenerator.get());
- }
-
- if (configuration.isAllowRollover()) {
- scheduledExecService.scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- // Check if we need to roll over
- if (needToRollover()) {
- // it appears that we do need to roll over. Obtain write lock so that we can do so, and then
- // confirm that we still need to.
- writeLock.lock();
- try {
- logger.debug("Obtained write lock to perform periodic rollover");
-
- if (needToRollover()) {
- try {
- rollover(false);
- } catch (final Exception e) {
- logger.error("Failed to roll over Provenance Event Log due to {}", e.toString());
- logger.error("", e);
- }
- }
- } finally {
- writeLock.unlock();
- }
- }
- }
- }, rolloverCheckMillis, rolloverCheckMillis, TimeUnit.MILLISECONDS);
-
- scheduledExecService.scheduleWithFixedDelay(new RemoveExpiredQueryResults(), 30L, 3L, TimeUnit.SECONDS);
- scheduledExecService.scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- try {
- purgeOldEvents();
- } catch (final Exception e) {
- logger.error("Failed to purge old events from Provenance Repo due to {}", e.toString());
- if (logger.isDebugEnabled()) {
- logger.error("", e);
- }
- eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to purge old events from Provenance Repo due to " + e.toString());
- }
- }
- }, 1L, 1L, TimeUnit.MINUTES);
-
- expirationActions.add(new DeleteIndexAction(this, indexConfig));
- expirationActions.add(new FileRemovalAction());
- }
+ writeLock.lock();
+ try {
+ if (initialized.getAndSet(true)) {
+ return;
+ }
+
+ this.eventReporter = eventReporter;
+
+ recover();
+
+ if (configuration.isAllowRollover()) {
+ writers = createWriters(configuration, idGenerator.get());
+ }
+
+ if (configuration.isAllowRollover()) {
+ scheduledExecService.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ // Check if we need to roll over
+ if (needToRollover()) {
+ // it appears that we do need to roll over. Obtain write lock so that we can do so, and then
+ // confirm that we still need to.
+ writeLock.lock();
+ try {
+ logger.debug("Obtained write lock to perform periodic rollover");
+
+ if (needToRollover()) {
+ try {
+ rollover(false);
+ } catch (final Exception e) {
+ logger.error("Failed to roll over Provenance Event Log due to {}", e.toString());
+ logger.error("", e);
+ }
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+ }
+ }, rolloverCheckMillis, rolloverCheckMillis, TimeUnit.MILLISECONDS);
+
+ scheduledExecService.scheduleWithFixedDelay(new RemoveExpiredQueryResults(), 30L, 3L, TimeUnit.SECONDS);
+ scheduledExecService.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ purgeOldEvents();
+ } catch (final Exception e) {
+ logger.error("Failed to purge old events from Provenance Repo due to {}", e.toString());
+ if (logger.isDebugEnabled()) {
+ logger.error("", e);
+ }
+ eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to purge old events from Provenance Repo due to " + e.toString());
+ }
+ }
+ }, 1L, 1L, TimeUnit.MINUTES);
+
+ expirationActions.add(new DeleteIndexAction(this, indexConfig, indexManager));
+ expirationActions.add(new FileRemovalAction());
+ }
+ } finally {
+ writeLock.unlock();
+ }
}
private static RepositoryConfiguration createRepositoryConfiguration() throws IOException {
@@ -334,10 +335,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
final File journalDirectory = new File(storageDirectory, "journals");
final File journalFile = new File(journalDirectory, String.valueOf(initialRecordId) + ".journal." + i);
- writers[i] = RecordWriters.newRecordWriter(journalFile);
+ writers[i] = RecordWriters.newRecordWriter(journalFile, false, false);
writers[i].writeHeader();
}
+ logger.info("Created new Provenance Event Writers for events starting with ID {}", initialRecordId);
return writers;
}
@@ -501,18 +503,15 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
// Determine the max ID in the last file.
try (final RecordReader reader = RecordReaders.newRecordReader(maxIdFile, getAllLogFiles())) {
- ProvenanceEventRecord record;
- while ((record = reader.nextRecord()) != null) {
- final long eventId = record.getEventId();
- if (eventId > maxId) {
- maxId = eventId;
- }
+ final long eventId = reader.getMaxEventId();
+ if (eventId > maxId) {
+ maxId = eventId;
+ }
- // If the ID is greater than the max indexed id and this file was indexed, then
- // update the max indexed id
- if (eventId > maxIndexedId && lastFileIndexed) {
- maxIndexedId = eventId;
- }
+ // If the ID is greater than the max indexed id and this file was indexed, then
+ // update the max indexed id
+ if (eventId > maxIndexedId && lastFileIndexed) {
+ maxIndexedId = eventId;
}
} catch (final IOException ioe) {
logger.error("Failed to read Provenance Event File {} due to {}", maxIdFile, ioe);
@@ -568,16 +567,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
// Read the records in the last file to find its max id
if (greatestMinIdFile != null) {
try (final RecordReader recordReader = RecordReaders.newRecordReader(greatestMinIdFile, Collections.<Path>emptyList())) {
- StandardProvenanceEventRecord record;
-
- try {
- while ((record = recordReader.nextRecord()) != null) {
- if (record.getEventId() > maxId) {
- maxId = record.getEventId();
- }
- }
- } catch (final EOFException eof) {
- }
+ maxId = recordReader.getMaxEventId();
}
}
@@ -599,46 +589,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
}
logger.info("Recovered {} records", recordsRecovered);
-
- final List<RolloverAction> rolloverActions = this.rolloverActions;
- final Runnable retroactiveRollover = new Runnable() {
- @Override
- public void run() {
- for (File toRecover : filesToRecover) {
- final String baseFileName = LuceneUtil.substringBefore(toRecover.getName(), ".");
- final Long fileFirstEventId = Long.parseLong(baseFileName);
-
- for (final RolloverAction action : rolloverActions) {
- if (!action.hasBeenPerformed(toRecover)) {
- try {
- final StopWatch stopWatch = new StopWatch(true);
-
- toRecover = action.execute(toRecover);
-
- stopWatch.stop();
- final String duration = stopWatch.getDuration();
- logger.info("Successfully performed retroactive action {} against {} in {}", action, toRecover, duration);
-
- // update our map of id to Path
- final Map<Long, Path> updatedMap = addToPathMap(fileFirstEventId, toRecover.toPath());
- logger.trace("After retroactive rollover action {}, Path Map: {}", action, updatedMap);
- } catch (final Exception e) {
- logger.error("Failed to perform retroactive rollover actions on {} due to {}", toRecover, e.toString());
- logger.error("", e);
- eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to perform retroactive rollover actions on " + toRecover + " due to " + e.toString());
- }
- }
- }
- }
- }
- };
- rolloverExecutor.submit(retroactiveRollover);
-
recoveryFinished.set(true);
}
@Override
- public void close() throws IOException {
+ public synchronized void close() throws IOException {
writeLock.lock();
try {
logger.debug("Obtained write lock for close");
@@ -648,8 +603,12 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
rolloverExecutor.shutdownNow();
queryExecService.shutdownNow();
- for (final RecordWriter writer : writers) {
- writer.close();
+ indexManager.close();
+
+ if ( writers != null ) {
+ for (final RecordWriter writer : writers) {
+ writer.close();
+ }
}
} finally {
writeLock.unlock();
@@ -945,6 +904,21 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
}
}
+ // made protected for testing purposes
+ protected int getJournalCount() {
+ // determine how many 'journals' we have in the journals directories
+ int journalFileCount = 0;
+ for ( final File storageDir : configuration.getStorageDirectories() ) {
+ final File journalsDir = new File(storageDir, "journals");
+ final File[] journalFiles = journalsDir.listFiles();
+ if ( journalFiles != null ) {
+ journalFileCount += journalFiles.length;
+ }
+ }
+
+ return journalFileCount;
+ }
+
/**
* MUST be called with the write lock held
*
@@ -963,9 +937,45 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
for (final RecordWriter writer : writers) {
final File writerFile = writer.getFile();
journalsToMerge.add(writerFile);
- writer.close();
+ try {
+ writer.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close {} due to {}", writer, ioe.toString());
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+ }
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), "."));
}
+ int journalFileCount = getJournalCount();
+ final int journalCountThreshold = configuration.getJournalCount() * 5;
+ if ( journalFileCount > journalCountThreshold ) {
+ logger.warn("The rate of the dataflow is exceeding the provenance recording rate. "
+ + "Slowing down flow to accomodate. Currently, there are {} journal files and "
+ + "threshold for blocking is {}", journalFileCount, journalCountThreshold);
+ eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The rate of the dataflow is "
+ + "exceeding the provenance recording rate. Slowing down flow to accomodate");
+
+ while (journalFileCount > journalCountThreshold) {
+ try {
+ Thread.sleep(1000L);
+ } catch (final InterruptedException ie) {
+ }
+
+ logger.debug("Provenance Repository is still behind. Keeping flow slowed down "
+ + "to accomodate. Currently, there are {} journal files and "
+ + "threshold for blocking is {}", journalFileCount, journalCountThreshold);
+
+ journalFileCount = getJournalCount();
+ }
+
+ logger.info("Provenance Repository has no caught up with rolling over journal files. Current number of "
+ + "journal files to be rolled over is {}", journalFileCount);
+ }
+
writers = createWriters(configuration, idGenerator.get());
streamStartTime.set(System.currentTimeMillis());
recordsWrittenSinceRollover.getAndSet(0);
@@ -974,60 +984,29 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
final List<File> storageDirs = configuration.getStorageDirectories();
final File storageDir = storageDirs.get((int) (storageDirIdx % storageDirs.size()));
- final List<RolloverAction> actions = rolloverActions;
+ final AtomicReference<Future<?>> futureReference = new AtomicReference<>();
final int recordsWritten = recordsWrittenSinceRollover.getAndSet(0);
final Runnable rolloverRunnable = new Runnable() {
@Override
public void run() {
- final File fileRolledOver;
-
- try {
- fileRolledOver = mergeJournals(journalsToMerge, storageDir, getMergeFile(journalsToMerge, storageDir), eventReporter, latestRecords);
- repoDirty.set(false);
- } catch (final IOException ioe) {
- repoDirty.set(true);
- logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
- logger.error("", ioe);
- return;
- }
-
- if (fileRolledOver == null) {
- return;
- }
- File file = fileRolledOver;
-
- for (final RolloverAction action : actions) {
- try {
- final StopWatch stopWatch = new StopWatch(true);
- file = action.execute(file);
- stopWatch.stop();
- logger.info("Successfully performed Rollover Action {} for {} in {}", action, file, stopWatch.getDuration());
-
- // update our map of id to Path
- // need lock to update the map, even though it's an AtomicReference, AtomicReference allows those doing a
- // get() to obtain the most up-to-date version but we use a writeLock to prevent multiple threads modifying
- // it at one time
- writeLock.lock();
- try {
- final Long fileFirstEventId = Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), "."));
- SortedMap<Long, Path> newIdToPathMap = new TreeMap<>(new PathMapComparator());
- newIdToPathMap.putAll(idToPathMap.get());
- newIdToPathMap.put(fileFirstEventId, file.toPath());
- idToPathMap.set(newIdToPathMap);
- logger.trace("After rollover action {}, path map: {}", action, newIdToPathMap);
- } finally {
- writeLock.unlock();
- }
- } catch (final Throwable t) {
- logger.error("Failed to perform Rollover Action {} for {}: got Exception {}",
- action, fileRolledOver, t.toString());
- logger.error("", t);
-
- return;
- }
- }
-
- if (actions.isEmpty()) {
+ try {
+ final File fileRolledOver;
+
+ try {
+ fileRolledOver = mergeJournals(journalsToMerge, storageDir, getMergeFile(journalsToMerge, storageDir), eventReporter, latestRecords);
+ repoDirty.set(false);
+ } catch (final IOException ioe) {
+ repoDirty.set(true);
+ logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
+ logger.error("", ioe);
+ return;
+ }
+
+ if (fileRolledOver == null) {
+ return;
+ }
+ File file = fileRolledOver;
+
// update our map of id to Path
// need lock to update the map, even though it's an AtomicReference, AtomicReference allows those doing a
// get() to obtain the most up-to-date version but we use a writeLock to prevent multiple threads modifying
@@ -1042,35 +1021,37 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
} finally {
writeLock.unlock();
}
- }
-
- logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten);
- rolloverCompletions.getAndIncrement();
+
+ logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten);
+ rolloverCompletions.getAndIncrement();
+
+ // We have finished successfully. Cancel the future so that we don't run anymore
+ Future<?> future;
+ while ((future = futureReference.get()) == null) {
+ try {
+ Thread.sleep(10L);
+ } catch (final InterruptedException ie) {
+ }
+ }
+
+ future.cancel(false);
+ } catch (final Throwable t) {
+ logger.error("Failed to rollover Provenance repository due to {}", t.toString());
+ logger.error("", t);
+ }
}
};
- rolloverExecutor.submit(rolloverRunnable);
+ // We are going to schedule the future to run every 10 seconds. This allows us to keep retrying if we
+ // fail for some reason. When we succeed, the Runnable will cancel itself.
+ final Future<?> future = rolloverExecutor.scheduleWithFixedDelay(rolloverRunnable, 0, 10, TimeUnit.SECONDS);
+ futureReference.set(future);
streamStartTime.set(System.currentTimeMillis());
bytesWrittenSinceRollover.set(0);
}
}
- private SortedMap<Long, Path> addToPathMap(final Long firstEventId, final Path path) {
- SortedMap<Long, Path> unmodifiableMap;
- boolean updated = false;
- do {
- final SortedMap<Long, Path> existingMap = idToPathMap.get();
- final SortedMap<Long, Path> newIdToPathMap = new TreeMap<>(new PathMapComparator());
- newIdToPathMap.putAll(existingMap);
- newIdToPathMap.put(firstEventId, path);
- unmodifiableMap = Collections.unmodifiableSortedMap(newIdToPathMap);
-
- updated = idToPathMap.compareAndSet(existingMap, unmodifiableMap);
- } while (!updated);
-
- return unmodifiableMap;
- }
private Set<File> recoverJournalFiles() throws IOException {
if (!configuration.isAllowRollover()) {
@@ -1093,6 +1074,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
}
for (final File journalFile : journalFiles) {
+ if ( journalFile.isDirectory() ) {
+ continue;
+ }
+
final String basename = LuceneUtil.substringBefore(journalFile.getName(), ".");
List<File> files = journalMap.get(basename);
if (files == null) {
@@ -1135,22 +1120,92 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
return mergedFile;
}
- static File mergeJournals(final List<File> journalFiles, final File storageDir, final File mergedFile, final EventReporter eventReporter, final RingBuffer<ProvenanceEventRecord> ringBuffer) throws IOException {
- final long startNanos = System.nanoTime();
+ File mergeJournals(final List<File> journalFiles, final File storageDir, final File mergedFile, final EventReporter eventReporter, final RingBuffer<ProvenanceEventRecord> ringBuffer) throws IOException {
+ logger.debug("Merging {} to {}", journalFiles, mergedFile);
+ if ( this.closed ) {
+ logger.info("Provenance Repository has been closed; will not merge journal files to {}", mergedFile);
+ return null;
+ }
+
if (journalFiles.isEmpty()) {
return null;
}
- if (mergedFile.exists()) {
- throw new FileAlreadyExistsException("Cannot Merge " + journalFiles.size() + " Journal Files into Merged Provenance Log File " + mergedFile.getAbsolutePath() + " because the Merged File already exists");
+ Collections.sort(journalFiles, new Comparator<File>() {
+ @Override
+ public int compare(final File o1, final File o2) {
+ final String suffix1 = LuceneUtil.substringAfterLast(o1.getName(), ".");
+ final String suffix2 = LuceneUtil.substringAfterLast(o2.getName(), ".");
+
+ try {
+ final int journalIndex1 = Integer.parseInt(suffix1);
+ final int journalIndex2 = Integer.parseInt(suffix2);
+ return Integer.compare(journalIndex1, journalIndex2);
+ } catch (final NumberFormatException nfe) {
+ return o1.getName().compareTo(o2.getName());
+ }
+ }
+ });
+
+ final String firstJournalFile = journalFiles.get(0).getName();
+ final String firstFileSuffix = LuceneUtil.substringAfterLast(firstJournalFile, ".");
+ final boolean allPartialFiles = firstFileSuffix.equals("0");
+
+ // check if we have all of the "partial" files for the journal.
+ if (allPartialFiles) {
+ if ( mergedFile.exists() ) {
+ // we have all "partial" files and there is already a merged file. Delete the data from the index
+ // because the merge file may not be fully merged. We will re-merge.
+ logger.warn("Merged Journal File {} already exists; however, all partial journal files also exist "
+ + "so assuming that the merge did not finish. Repeating procedure in order to ensure consistency.");
+
+ final DeleteIndexAction deleteAction = new DeleteIndexAction(this, indexConfig, indexManager);
+ try {
+ deleteAction.execute(mergedFile);
+ } catch (final Exception e) {
+ logger.warn("Failed to delete records from Journal File {} from the index; this could potentially result in duplicates. Failure was due to {}", mergedFile, e.toString());
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", e);
+ }
+ }
+
+ // Since we only store the file's basename, block offset, and event ID, and because the newly created file could end up on
+ // a different Storage Directory than the original, we need to ensure that we delete both the partially merged
+ // file and the TOC file. Otherwise, we could get the wrong copy and have issues retrieving events.
+ if ( !mergedFile.delete() ) {
+ logger.error("Failed to delete partially written Provenance Journal File {}. This may result in events from this journal "
+ + "file not being able to be displayed. This file should be deleted manually.", mergedFile);
+ }
+
+ final File tocFile = TocUtil.getTocFile(mergedFile);
+ if ( tocFile.exists() && !tocFile.delete() ) {
+ logger.error("Failed to delete .toc file {}; this may result in not being able to read the Provenance Events from the {} Journal File. "
+ + "This can be corrected by manually deleting the {} file", tocFile, mergedFile, tocFile);
+ }
+ }
+ } else {
+ logger.warn("Cannot merge journal files {} because expected first file to end with extension '.0' "
+ + "but it did not; assuming that the files were already merged but only some finished deletion "
+ + "before restart. Deleting remaining partial journal files.", journalFiles);
+
+ for ( final File file : journalFiles ) {
+ if ( !file.delete() && file.exists() ) {
+ logger.warn("Failed to delete unneeded journal file {}; this file should be cleaned up manually", file);
+ }
+ }
+
+ return null;
}
-
- final File tempMergedFile = new File(mergedFile.getParentFile(), mergedFile.getName() + ".part");
+
+ final long startNanos = System.nanoTime();
// Map each journal to a RecordReader
final List<RecordReader> readers = new ArrayList<>();
int records = 0;
+ final boolean isCompress = configuration.isCompressOnRollover();
+ final File writerFile = isCompress ? new File(mergedFile.getParentFile(), mergedFile.getName() + ".gz") : mergedFile;
+
try {
for (final File journalFile : journalFiles) {
try {
@@ -1203,32 +1258,50 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
// loop over each entry in the map, persisting the records to the merged file in order, and populating the map
// with the next entry from the journal file from which the previous record was written.
- try (final RecordWriter writer = RecordWriters.newRecordWriter(tempMergedFile)) {
+ try (final RecordWriter writer = RecordWriters.newRecordWriter(writerFile, configuration.isCompressOnRollover(), true)) {
writer.writeHeader();
- while (!recordToReaderMap.isEmpty()) {
- final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
- final StandardProvenanceEventRecord record = entry.getKey();
- final RecordReader reader = entry.getValue();
-
- writer.writeRecord(record, record.getEventId());
- ringBuffer.add(record);
- records++;
-
- // Remove this entry from the map
- recordToReaderMap.remove(record);
-
- // Get the next entry from this reader and add it to the map
- StandardProvenanceEventRecord nextRecord = null;
-
- try {
- nextRecord = reader.nextRecord();
- } catch (final EOFException eof) {
- }
-
- if (nextRecord != null) {
- recordToReaderMap.put(nextRecord, reader);
- }
+ final IndexingAction indexingAction = new IndexingAction(this, indexConfig);
+
+ final File indexingDirectory = indexConfig.getWritableIndexDirectory(writerFile);
+ final IndexWriter indexWriter = indexManager.borrowIndexWriter(indexingDirectory);
+ try {
+ long maxId = 0L;
+
+ while (!recordToReaderMap.isEmpty()) {
+ final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
+ final StandardProvenanceEventRecord record = entry.getKey();
+ final RecordReader reader = entry.getValue();
+
+ writer.writeRecord(record, record.getEventId());
+ final int blockIndex = writer.getTocWriter().getCurrentBlockIndex();
+
+ indexingAction.index(record, indexWriter, blockIndex);
+ maxId = record.getEventId();
+
+ ringBuffer.add(record);
+ records++;
+
+ // Remove this entry from the map
+ recordToReaderMap.remove(record);
+
+ // Get the next entry from this reader and add it to the map
+ StandardProvenanceEventRecord nextRecord = null;
+
+ try {
+ nextRecord = reader.nextRecord();
+ } catch (final EOFException eof) {
+ }
+
+ if (nextRecord != null) {
+ recordToReaderMap.put(nextRecord, reader);
+ }
+ }
+
+ indexWriter.commit();
+ indexConfig.setMaxIdIndexed(maxId);
+ } finally {
+ indexManager.returnIndexWriter(indexingDirectory, indexWriter);
}
}
} finally {
@@ -1240,37 +1313,22 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
}
}
- // Attempt to rename. Keep trying for a bit if we fail. This happens often if we have some external process
- // that locks files, such as a virus scanner.
- boolean renamed = false;
- for (int i = 0; i < 10 && !renamed; i++) {
- renamed = tempMergedFile.renameTo(mergedFile);
- if (!renamed) {
- try {
- Thread.sleep(100L);
- } catch (final InterruptedException ie) {
- }
- }
- }
-
- if (!renamed) {
- throw new IOException("Failed to merge journal files into single merged file " + mergedFile.getAbsolutePath() + " because " + tempMergedFile.getAbsolutePath() + " could not be renamed");
- }
-
// Success. Remove all of the journal files, as they're no longer needed, now that they've been merged.
for (final File journalFile : journalFiles) {
- if (!journalFile.delete()) {
- if (journalFile.exists()) {
- logger.warn("Failed to remove temporary journal file {}; this file should be cleaned up manually", journalFile.getAbsolutePath());
- eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal file " + journalFile.getAbsolutePath() + "; this file should be cleaned up manually");
- } else {
- logger.warn("Failed to remove temporary journal file {} because it no longer exists", journalFile.getAbsolutePath());
- }
+ if (!journalFile.delete() && journalFile.exists()) {
+ logger.warn("Failed to remove temporary journal file {}; this file should be cleaned up manually", journalFile.getAbsolutePath());
+ eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal file " + journalFile.getAbsolutePath() + "; this file should be cleaned up manually");
+ }
+
+ final File tocFile = TocUtil.getTocFile(journalFile);
+ if (!tocFile.delete() && tocFile.exists()) {
+ logger.warn("Failed to remove temporary journal TOC file {}; this file should be cleaned up manually", tocFile.getAbsolutePath());
+ eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal TOC file " + tocFile.getAbsolutePath() + "; this file should be cleaned up manually");
}
}
if (records == 0) {
- mergedFile.delete();
+ writerFile.delete();
return null;
} else {
final long nanos = System.nanoTime() - startNanos;
@@ -1278,7 +1336,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
logger.info("Successfully merged {} journal files ({} records) into single Provenance Log File {} in {} milliseconds", journalFiles.size(), records, mergedFile, millis);
}
- return mergedFile;
+ return writerFile;
}
@Override
@@ -1779,7 +1837,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
@Override
public void run() {
try {
- final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir);
+ final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir, indexManager);
final StandardQueryResult queryResult = search.search(query, retrievalCount);
submission.getResult().update(queryResult.getMatchingEvents(), queryResult.getTotalHitCount());
if (queryResult.isFinished()) {
@@ -1787,7 +1845,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
query, indexDir, queryResult.getQueryTime(), queryResult.getTotalHitCount());
}
} catch (final Throwable t) {
- logger.error("Failed to query provenance repository due to {}", t.toString());
+ logger.error("Failed to query Provenance Repository Index {} due to {}", indexDir, t.toString());
if (logger.isDebugEnabled()) {
logger.error("", t);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
index d47df4f..3951591 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
@@ -33,7 +33,8 @@ public class RepositoryConfiguration {
private long eventFileBytes = 1024L * 1024L * 5L; // 5 MB
private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB
private int journalCount = 16;
-
+ private int compressionBlockBytes = 1024 * 1024;
+
private List<SearchableField> searchableFields = new ArrayList<>();
private List<SearchableField> searchableAttributes = new ArrayList<>();
private boolean compress = true;
@@ -49,7 +50,16 @@ public class RepositoryConfiguration {
return allowRollover;
}
- /**
+
+ public int getCompressionBlockBytes() {
+ return compressionBlockBytes;
+ }
+
+ public void setCompressionBlockBytes(int compressionBlockBytes) {
+ this.compressionBlockBytes = compressionBlockBytes;
+ }
+
+ /**
* Specifies where the repository will store data
*
* @return
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
index 5e4744b..9bbf195 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
@@ -17,41 +17,173 @@
package org.apache.nifi.provenance;
import java.io.DataInputStream;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.zip.GZIPInputStream;
+import org.apache.nifi.provenance.serialization.RecordReader;
+import org.apache.nifi.provenance.toc.TocReader;
+import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.ByteCountingInputStream;
+import org.apache.nifi.stream.io.LimitingInputStream;
import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.provenance.serialization.RecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class StandardRecordReader implements RecordReader {
-
- private final DataInputStream dis;
- private final ByteCountingInputStream byteCountingIn;
+ private static final Logger logger = LoggerFactory.getLogger(StandardRecordReader.class);
+
+ private final ByteCountingInputStream rawInputStream;
private final String filename;
private final int serializationVersion;
+ private final boolean compressed;
+ private final TocReader tocReader;
+ private final int headerLength;
+
+ private DataInputStream dis;
+ private ByteCountingInputStream byteCountingIn;
+
+ public StandardRecordReader(final InputStream in, final String filename) throws IOException {
+ this(in, filename, null);
+ }
+
+ public StandardRecordReader(final InputStream in, final String filename, final TocReader tocReader) throws IOException {
+ logger.trace("Creating RecordReader for {}", filename);
+
+ rawInputStream = new ByteCountingInputStream(in);
+
+ final InputStream limitedStream;
+ if ( tocReader == null ) {
+ limitedStream = rawInputStream;
+ } else {
+ final long offset1 = tocReader.getBlockOffset(1);
+ if ( offset1 < 0 ) {
+ limitedStream = rawInputStream;
+ } else {
+ limitedStream = new LimitingInputStream(rawInputStream, offset1 - rawInputStream.getBytesConsumed());
+ }
+ }
+
+ final InputStream readableStream;
+ if (filename.endsWith(".gz")) {
+ readableStream = new BufferedInputStream(new GZIPInputStream(limitedStream));
+ compressed = true;
+ } else {
+ readableStream = new BufferedInputStream(limitedStream);
+ compressed = false;
+ }
- public StandardRecordReader(final InputStream in, final int serializationVersion, final String filename) {
- if (serializationVersion < 1 || serializationVersion > 7) {
- throw new IllegalArgumentException("Unable to deserialize record because the version is " + serializationVersion + " and supported versions are 1-6");
+ byteCountingIn = new ByteCountingInputStream(readableStream);
+ dis = new DataInputStream(byteCountingIn);
+
+ final String repoClassName = dis.readUTF();
+ final int serializationVersion = dis.readInt();
+ headerLength = repoClassName.getBytes(StandardCharsets.UTF_8).length + 2 + 4; // 2 bytes for string length, 4 for integer.
+
+ if (serializationVersion < 1 || serializationVersion > 8) {
+ throw new IllegalArgumentException("Unable to deserialize record because the version is " + serializationVersion + " and supported versions are 1-8");
}
- byteCountingIn = new ByteCountingInputStream(in);
- this.dis = new DataInputStream(byteCountingIn);
this.serializationVersion = serializationVersion;
this.filename = filename;
+ this.tocReader = tocReader;
+ }
+
+ @Override
+ public void skipToBlock(final int blockIndex) throws IOException {
+ if ( tocReader == null ) {
+ throw new IllegalStateException("Cannot skip to block " + blockIndex + " for Provenance Log " + filename + " because no Table-of-Contents file was found for this Log");
+ }
+
+ if ( blockIndex < 0 ) {
+ throw new IllegalArgumentException("Cannot skip to block " + blockIndex + " because the value is negative");
+ }
+
+ if ( blockIndex == getBlockIndex() ) {
+ return;
+ }
+
+ final long offset = tocReader.getBlockOffset(blockIndex);
+ if ( offset < 0 ) {
+ throw new IOException("Unable to find block " + blockIndex + " in Provenance Log " + filename);
+ }
+
+ final long curOffset = rawInputStream.getBytesConsumed();
+
+ final long bytesToSkip = offset - curOffset;
+ if ( bytesToSkip >= 0 ) {
+ try {
+ StreamUtils.skip(rawInputStream, bytesToSkip);
+ logger.debug("Skipped stream from offset {} to {} ({} bytes skipped)", curOffset, offset, bytesToSkip);
+ } catch (final IOException e) {
+ throw new IOException("Failed to skip to offset " + offset + " for block " + blockIndex + " of Provenance Log " + filename, e);
+ }
+
+ resetStreamForNextBlock();
+ }
}
+
+ private void resetStreamForNextBlock() throws IOException {
+ final InputStream limitedStream;
+ if ( tocReader == null ) {
+ limitedStream = rawInputStream;
+ } else {
+ final long offset = tocReader.getBlockOffset(1 + getBlockIndex());
+ if ( offset < 0 ) {
+ limitedStream = rawInputStream;
+ } else {
+ limitedStream = new LimitingInputStream(rawInputStream, offset - rawInputStream.getBytesConsumed());
+ }
+ }
+
+ final InputStream readableStream;
+ if (compressed) {
+ readableStream = new BufferedInputStream(new GZIPInputStream(limitedStream));
+ } else {
+ readableStream = new BufferedInputStream(limitedStream);
+ }
+ byteCountingIn = new ByteCountingInputStream(readableStream, rawInputStream.getBytesConsumed());
+ dis = new DataInputStream(byteCountingIn);
+ }
+
+
+ @Override
+ public TocReader getTocReader() {
+ return tocReader;
+ }
+
+ @Override
+ public boolean isBlockIndexAvailable() {
+ return tocReader != null;
+ }
+
+ @Override
+ public int getBlockIndex() {
+ if ( tocReader == null ) {
+ throw new IllegalStateException("Cannot determine Block Index because no Table-of-Contents could be found for Provenance Log " + filename);
+ }
+
+ return tocReader.getBlockIndex(rawInputStream.getBytesConsumed());
+ }
+
+ @Override
+ public long getBytesConsumed() {
+ return byteCountingIn.getBytesConsumed();
+ }
+
private StandardProvenanceEventRecord readPreVersion6Record() throws IOException {
final long startOffset = byteCountingIn.getBytesConsumed();
- if (!isData(byteCountingIn)) {
+ if (!isData()) {
return null;
}
@@ -137,7 +269,7 @@ public class StandardRecordReader implements RecordReader {
final long startOffset = byteCountingIn.getBytesConsumed();
- if (!isData(byteCountingIn)) {
+ if (!isData()) {
return null;
}
@@ -242,9 +374,17 @@ public class StandardRecordReader implements RecordReader {
}
private String readUUID(final DataInputStream in) throws IOException {
- final long msb = in.readLong();
- final long lsb = in.readLong();
- return new UUID(msb, lsb).toString();
+ if ( serializationVersion < 8 ) {
+ final long msb = in.readLong();
+ final long lsb = in.readLong();
+ return new UUID(msb, lsb).toString();
+ } else {
+ // before version 8, we serialized UUID's as two longs in order to
+ // write less data. However, in version 8 we changed to just writing
+ // out the string because it's extremely expensive to call UUID.fromString.
+ // In the end, since we generally compress, the savings in minimal anyway.
+ return in.readUTF();
+ }
}
private String readNullableString(final DataInputStream in) throws IOException {
@@ -272,16 +412,58 @@ public class StandardRecordReader implements RecordReader {
return new String(strBytes, "UTF-8");
}
- private boolean isData(final InputStream in) throws IOException {
- in.mark(1);
- final int nextByte = in.read();
- in.reset();
+ private boolean isData() throws IOException {
+ byteCountingIn.mark(1);
+ int nextByte = byteCountingIn.read();
+ byteCountingIn.reset();
+
+ if ( nextByte < 0 ) {
+ try {
+ resetStreamForNextBlock();
+ } catch (final EOFException eof) {
+ return false;
+ }
+
+ byteCountingIn.mark(1);
+ nextByte = byteCountingIn.read();
+ byteCountingIn.reset();
+ }
+
return (nextByte >= 0);
}
+
+ @Override
+ public long getMaxEventId() throws IOException {
+ if ( tocReader != null ) {
+ final long lastBlockOffset = tocReader.getLastBlockOffset();
+ skipToBlock(tocReader.getBlockIndex(lastBlockOffset));
+ }
+
+ ProvenanceEventRecord record;
+ ProvenanceEventRecord lastRecord = null;
+ try {
+ while ((record = nextRecord()) != null) {
+ lastRecord = record;
+ }
+ } catch (final EOFException eof) {
+ // This can happen if we stop NIFi while the record is being written.
+ // This is OK, we just ignore this record. The session will not have been
+ // committed, so we can just process the FlowFile again.
+ }
+
+ return (lastRecord == null) ? -1L : lastRecord.getEventId();
+ }
@Override
public void close() throws IOException {
+ logger.trace("Closing Record Reader for {}", filename);
+
dis.close();
+ rawInputStream.close();
+
+ if ( tocReader != null ) {
+ tocReader.close();
+ }
}
@Override
@@ -291,7 +473,10 @@ public class StandardRecordReader implements RecordReader {
@Override
public void skipTo(final long position) throws IOException {
- final long currentPosition = byteCountingIn.getBytesConsumed();
+ // we are subtracting headerLength from the number of bytes consumed because we used to
+ // consider the offset of the first record "0" - now we consider it whatever position it
+ // it really is in the stream.
+ final long currentPosition = byteCountingIn.getBytesConsumed() - headerLength;
if (currentPosition == position) {
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
index df93084..dbb2c48 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
@@ -19,38 +19,54 @@ package org.apache.nifi.provenance;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.Collection;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.nifi.provenance.serialization.RecordWriter;
+import org.apache.nifi.provenance.toc.TocWriter;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.stream.io.DataOutputStream;
-import org.apache.nifi.provenance.serialization.RecordWriter;
+import org.apache.nifi.stream.io.GZIPOutputStream;
+import org.apache.nifi.stream.io.NonCloseableOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class StandardRecordWriter implements RecordWriter {
-
+ private static final Logger logger = LoggerFactory.getLogger(StandardRecordWriter.class);
+
private final File file;
- private final DataOutputStream out;
- private final ByteCountingOutputStream byteCountingOut;
private final FileOutputStream fos;
+ private final ByteCountingOutputStream rawOutStream;
+ private final TocWriter tocWriter;
+ private final boolean compressed;
+ private final int uncompressedBlockSize;
+
+ private DataOutputStream out;
+ private ByteCountingOutputStream byteCountingOut;
+ private long lastBlockOffset = 0L;
private int recordCount = 0;
private final Lock lock = new ReentrantLock();
- public StandardRecordWriter(final File file) throws IOException {
+
+ public StandardRecordWriter(final File file, final TocWriter writer, final boolean compressed, final int uncompressedBlockSize) throws IOException {
+ logger.trace("Creating Record Writer for {}", file.getName());
+
this.file = file;
+ this.compressed = compressed;
this.fos = new FileOutputStream(file);
- this.byteCountingOut = new ByteCountingOutputStream(new BufferedOutputStream(fos, 65536));
- this.out = new DataOutputStream(byteCountingOut);
+ rawOutStream = new ByteCountingOutputStream(fos);
+ this.uncompressedBlockSize = uncompressedBlockSize;
+
+ this.tocWriter = writer;
}
static void writeUUID(final DataOutputStream out, final String uuid) throws IOException {
- final UUID uuidObj = UUID.fromString(uuid);
- out.writeLong(uuidObj.getMostSignificantBits());
- out.writeLong(uuidObj.getLeastSignificantBits());
+ out.writeUTF(uuid);
}
static void writeUUIDs(final DataOutputStream out, final Collection<String> list) throws IOException {
@@ -69,18 +85,67 @@ public class StandardRecordWriter implements RecordWriter {
return file;
}
- @Override
+ @Override
public synchronized void writeHeader() throws IOException {
+ lastBlockOffset = rawOutStream.getBytesWritten();
+ resetWriteStream();
+
out.writeUTF(PersistentProvenanceRepository.class.getName());
out.writeInt(PersistentProvenanceRepository.SERIALIZATION_VERSION);
out.flush();
}
+
+ private void resetWriteStream() throws IOException {
+ if ( out != null ) {
+ out.flush();
+ }
+
+ final long byteOffset = (byteCountingOut == null) ? rawOutStream.getBytesWritten() : byteCountingOut.getBytesWritten();
+
+ final OutputStream writableStream;
+ if ( compressed ) {
+ // because of the way that GZIPOutputStream works, we need to call close() on it in order for it
+ // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap
+ // the underlying OutputStream in a NonCloseableOutputStream
+ if ( out != null ) {
+ out.close();
+ }
+
+ if ( tocWriter != null ) {
+ tocWriter.addBlockOffset(rawOutStream.getBytesWritten());
+ }
+
+ writableStream = new BufferedOutputStream(new GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536);
+ } else {
+ if ( tocWriter != null ) {
+ tocWriter.addBlockOffset(rawOutStream.getBytesWritten());
+ }
+
+ writableStream = new BufferedOutputStream(rawOutStream, 65536);
+ }
+
+ this.byteCountingOut = new ByteCountingOutputStream(writableStream, byteOffset);
+ this.out = new DataOutputStream(byteCountingOut);
+ }
+
@Override
public synchronized long writeRecord(final ProvenanceEventRecord record, long recordIdentifier) throws IOException {
final ProvenanceEventType recordType = record.getEventType();
final long startBytes = byteCountingOut.getBytesWritten();
+ // add a new block to the TOC if needed.
+ if ( tocWriter != null && (startBytes - lastBlockOffset >= uncompressedBlockSize) ) {
+ lastBlockOffset = startBytes;
+
+ if ( compressed ) {
+ // because of the way that GZIPOutputStream works, we need to call close() on it in order for it
+ // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap
+ // the underlying OutputStream in a NonCloseableOutputStream
+ resetWriteStream();
+ }
+ }
+
out.writeLong(recordIdentifier);
out.writeUTF(record.getEventType().name());
out.writeLong(record.getEventTime());
@@ -196,13 +261,24 @@ public class StandardRecordWriter implements RecordWriter {
@Override
public synchronized void close() throws IOException {
+ logger.trace("Closing Record Writer for {}", file.getName());
+
lock();
try {
- out.flush();
- out.close();
+ try {
+ out.flush();
+ out.close();
+ } finally {
+ rawOutStream.close();
+
+ if ( tocWriter != null ) {
+ tocWriter.close();
+ }
+ }
} finally {
unlock();
}
+
}
@Override
@@ -232,6 +308,14 @@ public class StandardRecordWriter implements RecordWriter {
@Override
public void sync() throws IOException {
- fos.getFD().sync();
+ if ( tocWriter != null ) {
+ tocWriter.sync();
+ }
+ fos.getFD().sync();
+ }
+
+ @Override
+ public TocWriter getTocWriter() {
+ return tocWriter;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
index 4608419..7db04aa 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
@@ -16,25 +16,17 @@
*/
package org.apache.nifi.provenance.lucene;
-import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.util.List;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.Term;
import org.apache.nifi.provenance.IndexConfiguration;
import org.apache.nifi.provenance.PersistentProvenanceRepository;
-import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.provenance.expiration.ExpirationAction;
import org.apache.nifi.provenance.serialization.RecordReader;
import org.apache.nifi.provenance.serialization.RecordReaders;
-
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.FSDirectory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,10 +35,12 @@ public class DeleteIndexAction implements ExpirationAction {
private static final Logger logger = LoggerFactory.getLogger(DeleteIndexAction.class);
private final PersistentProvenanceRepository repository;
private final IndexConfiguration indexConfiguration;
+ private final IndexManager indexManager;
- public DeleteIndexAction(final PersistentProvenanceRepository repo, final IndexConfiguration indexConfiguration) {
+ public DeleteIndexAction(final PersistentProvenanceRepository repo, final IndexConfiguration indexConfiguration, final IndexManager indexManager) {
this.repository = repo;
this.indexConfiguration = indexConfiguration;
+ this.indexManager = indexManager;
}
@Override
@@ -55,51 +49,38 @@ public class DeleteIndexAction implements ExpirationAction {
long numDeleted = 0;
long maxEventId = -1L;
try (final RecordReader reader = RecordReaders.newRecordReader(expiredFile, repository.getAllLogFiles())) {
- try {
- StandardProvenanceEventRecord record;
- while ((record = reader.nextRecord()) != null) {
- numDeleted++;
-
- if (record.getEventId() > maxEventId) {
- maxEventId = record.getEventId();
- }
- }
- } catch (final EOFException eof) {
- // finished reading -- the last record was not completely written out, so it is discarded.
- }
- } catch (final EOFException eof) {
- // no data in file.
- return expiredFile;
+ maxEventId = reader.getMaxEventId();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to obtain max ID present in journal file {}", expiredFile.getAbsolutePath());
}
// remove the records from the index
final List<File> indexDirs = indexConfiguration.getIndexDirectories(expiredFile);
for (final File indexingDirectory : indexDirs) {
- try (final Directory directory = FSDirectory.open(indexingDirectory);
- final Analyzer analyzer = new StandardAnalyzer()) {
- IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
- config.setWriteLockTimeout(300000L);
-
- Term term = new Term(FieldNames.STORAGE_FILENAME, LuceneUtil.substringBefore(expiredFile.getName(), "."));
+ final Term term = new Term(FieldNames.STORAGE_FILENAME, LuceneUtil.substringBefore(expiredFile.getName(), "."));
- boolean deleteDir = false;
- try (final IndexWriter indexWriter = new IndexWriter(directory, config)) {
- indexWriter.deleteDocuments(term);
- indexWriter.commit();
- final int docsLeft = indexWriter.numDocs();
- deleteDir = (docsLeft <= 0);
- logger.debug("After expiring {}, there are {} docs left for index {}", expiredFile, docsLeft, indexingDirectory);
- }
+ boolean deleteDir = false;
+ final IndexWriter writer = indexManager.borrowIndexWriter(indexingDirectory);
+ try {
+ writer.deleteDocuments(term);
+ writer.commit();
+ final int docsLeft = writer.numDocs();
+ deleteDir = (docsLeft <= 0);
+ logger.debug("After expiring {}, there are {} docs left for index {}", expiredFile, docsLeft, indexingDirectory);
+ } finally {
+ indexManager.returnIndexWriter(indexingDirectory, writer);
+ }
- // we've confirmed that all documents have been removed. Delete the index directory.
- if (deleteDir) {
- indexConfiguration.removeIndexDirectory(indexingDirectory);
- deleteDirectory(indexingDirectory);
- logger.info("Removed empty index directory {}", indexingDirectory);
- }
+ // we've confirmed that all documents have been removed. Delete the index directory.
+ if (deleteDir) {
+ indexManager.removeIndex(indexingDirectory);
+ indexConfiguration.removeIndexDirectory(indexingDirectory);
+
+ deleteDirectory(indexingDirectory);
+ logger.info("Removed empty index directory {}", indexingDirectory);
}
}
-
+
// Update the minimum index to 1 more than the max Event ID in this file.
if (maxEventId > -1L) {
indexConfiguration.setMinIdIndexed(maxEventId + 1L);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
index 6446a35..5a77f42 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
@@ -23,23 +23,30 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.SearchableFields;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.provenance.serialization.RecordReader;
import org.apache.nifi.provenance.serialization.RecordReaders;
-
+import org.apache.nifi.provenance.toc.TocReader;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexableField;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class DocsReader {
-
+ private final Logger logger = LoggerFactory.getLogger(DocsReader.class);
+
public DocsReader(final List<File> storageDirectories) {
}
@@ -48,6 +55,7 @@ public class DocsReader {
return Collections.emptySet();
}
+ final long start = System.nanoTime();
final int numDocs = Math.min(topDocs.scoreDocs.length, maxResults);
final List<Document> docs = new ArrayList<>(numDocs);
@@ -60,63 +68,102 @@ public class DocsReader {
}
}
+ final long readDocuments = System.nanoTime() - start;
+ logger.debug("Reading {} Lucene Documents took {} millis", docs.size(), TimeUnit.NANOSECONDS.toMillis(readDocuments));
return read(docs, allProvenanceLogFiles);
}
+
+ private long getByteOffset(final Document d, final RecordReader reader) {
+ final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
+ if ( blockField != null ) {
+ final int blockIndex = blockField.numericValue().intValue();
+ final TocReader tocReader = reader.getTocReader();
+ return tocReader.getBlockOffset(blockIndex);
+ }
+
+ return d.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
+ }
+
+
+ private ProvenanceEventRecord getRecord(final Document d, final RecordReader reader) throws IOException {
+ IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
+ if ( blockField == null ) {
+ reader.skipTo(getByteOffset(d, reader));
+ } else {
+ reader.skipToBlock(blockField.numericValue().intValue());
+ }
+
+ StandardProvenanceEventRecord record;
+ while ( (record = reader.nextRecord()) != null) {
+ IndexableField idField = d.getField(SearchableFields.Identifier.getSearchableFieldName());
+ if ( idField == null || idField.numericValue().longValue() == record.getEventId() ) {
+ break;
+ }
+ }
+
+ if ( record == null ) {
+ throw new IOException("Failed to find Provenance Event " + d);
+ } else {
+ return record;
+ }
+ }
+
+
public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles) throws IOException {
LuceneUtil.sortDocsForRetrieval(docs);
RecordReader reader = null;
String lastStorageFilename = null;
- long lastByteOffset = 0L;
final Set<ProvenanceEventRecord> matchingRecords = new LinkedHashSet<>();
+ final long start = System.nanoTime();
+ int logFileCount = 0;
+
+ final Set<String> storageFilesToSkip = new HashSet<>();
+
try {
for (final Document d : docs) {
final String storageFilename = d.getField(FieldNames.STORAGE_FILENAME).stringValue();
- final long byteOffset = d.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
-
+ if ( storageFilesToSkip.contains(storageFilename) ) {
+ continue;
+ }
+
try {
- if (reader != null && storageFilename.equals(lastStorageFilename) && byteOffset > lastByteOffset) {
- // Still the same file and the offset is downstream.
- try {
- reader.skipTo(byteOffset);
- final StandardProvenanceEventRecord record = reader.nextRecord();
- matchingRecords.add(record);
- } catch (final IOException e) {
- throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository");
- }
-
+ if (reader != null && storageFilename.equals(lastStorageFilename)) {
+ matchingRecords.add(getRecord(d, reader));
} else {
+ logger.debug("Opening log file {}", storageFilename);
+
+ logFileCount++;
if (reader != null) {
reader.close();
}
List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
if (potentialFiles.isEmpty()) {
- throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository");
+ logger.warn("Could not find Provenance Log File with basename {} in the "
+ + "Provenance Repository; assuming file has expired and continuing without it", storageFilename);
+ storageFilesToSkip.add(storageFilename);
+ continue;
}
if (potentialFiles.size() > 1) {
- throw new FileNotFoundException("Found multiple Provenance Log Files with basename " + storageFilename + " in the Provenance Repository");
+ throw new FileNotFoundException("Found multiple Provenance Log Files with basename " +
+ storageFilename + " in the Provenance Repository");
}
for (final File file : potentialFiles) {
- reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles);
-
try {
- reader.skip(byteOffset);
-
- final StandardProvenanceEventRecord record = reader.nextRecord();
- matchingRecords.add(record);
+ reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles);
+ matchingRecords.add(getRecord(d, reader));
} catch (final IOException e) {
- throw new IOException("Failed to retrieve record from Provenance File " + file + " due to " + e, e);
+ throw new IOException("Failed to retrieve record " + d + " from Provenance File " + file + " due to " + e, e);
}
}
}
} finally {
lastStorageFilename = storageFilename;
- lastByteOffset = byteOffset;
}
}
} finally {
@@ -125,6 +172,9 @@ public class DocsReader {
}
}
+ final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ logger.debug("Took {} ms to read {} events from {} prov log files", millis, matchingRecords.size(), logFileCount);
+
return matchingRecords;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/FieldNames.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/FieldNames.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/FieldNames.java
index 6afc193..90a73f4 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/FieldNames.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/FieldNames.java
@@ -20,4 +20,5 @@ public class FieldNames {
public static final String STORAGE_FILENAME = "storage-filename";
public static final String STORAGE_FILE_OFFSET = "storage-fileOffset";
+ public static final String BLOCK_INDEX = "block-index";
}
[06/10] incubator-nifi git commit: NIFI-527: Compress prov logs in
'chunks' and just store the chunk offsets in Lucene instead of byte offsets
Posted by ma...@apache.org.
NIFI-527: Compress prov logs in 'chunks' and just store the chunk offsets in Lucene instead of byte offsets
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/7c41225e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/7c41225e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/7c41225e
Branch: refs/heads/develop
Commit: 7c41225e89e05fe1a234920fd1aa8ca248f43850
Parents: a5ac48a
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Apr 27 10:41:18 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Apr 27 10:41:18 2015 -0400
----------------------------------------------------------------------
.../nifi/stream/io/ByteCountingInputStream.java | 5 +++++
.../nifi/stream/io/ByteCountingOutputStream.java | 8 ++++++++
.../TestPersistentProvenanceRepository.java | 19 ++++++++++++++++++-
3 files changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7c41225e/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
index 8294af3..d1ed023 100644
--- a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
+++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
@@ -31,6 +31,11 @@ public class ByteCountingInputStream extends InputStream {
this.in = in;
}
+ public ByteCountingInputStream(final InputStream in, final long initialOffset) {
+ this.in = in;
+ this.bytesSkipped = initialOffset;
+ }
+
@Override
public int read() throws IOException {
final int fromSuper = in.read();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7c41225e/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
index 3e3e3fe..e71937e 100644
--- a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
+++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
@@ -27,6 +27,12 @@ public class ByteCountingOutputStream extends OutputStream {
public ByteCountingOutputStream(final OutputStream out) {
this.out = out;
}
+
+ public ByteCountingOutputStream(final OutputStream out, final long initialByteCount) {
+ this.out = out;
+ this.bytesWritten = initialByteCount;
+ }
+
@Override
public void write(int b) throws IOException {
@@ -39,6 +45,8 @@ public class ByteCountingOutputStream extends OutputStream {
write(b, 0, b.length);
}
+ ;
+
@Override
public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7c41225e/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index 25a363f..5541ab5 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -110,7 +110,24 @@ public class TestPersistentProvenanceRepository {
// we create but also to ensure that we have closed all of the file handles. If we leave any
// streams open, for instance, this will throw an IOException, causing our unit test to fail.
for ( final File storageDir : config.getStorageDirectories() ) {
- FileUtils.deleteFile(storageDir, true);
+ int i;
+ for (i=0; i < 3; i++) {
+ try {
+ FileUtils.deleteFile(storageDir, true);
+ break;
+ } catch (final IOException ioe) {
+ // if there is a virus scanner, etc. running in the background we may not be able to
+ // delete the file. Wait a sec and try again.
+ if ( i == 2 ) {
+ throw ioe;
+ } else {
+ try {
+ Thread.sleep(1000L);
+ } catch (final InterruptedException ie) {
+ }
+ }
+ }
+ }
}
}
[04/10] incubator-nifi git commit: NIFI-527: Merging develop
Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
new file mode 100644
index 0000000..3943504
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
@@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.lucene;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IndexManager implements Closeable {
+ private static final Logger logger = LoggerFactory.getLogger(IndexManager.class);
+
+ private final Lock lock = new ReentrantLock();
+ private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
+ private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>();
+
+
+ public void removeIndex(final File indexDirectory) {
+ final File absoluteFile = indexDirectory.getAbsoluteFile();
+ logger.info("Removing index {}", indexDirectory);
+
+ lock.lock();
+ try {
+ final IndexWriterCount count = writerCounts.remove(absoluteFile);
+ if ( count != null ) {
+ try {
+ count.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+ }
+
+ for ( final List<ActiveIndexSearcher> searcherList : activeSearchers.values() ) {
+ for ( final ActiveIndexSearcher searcher : searcherList ) {
+ try {
+ searcher.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close Index Searcher {} for {} due to {}",
+ searcher.getSearcher(), absoluteFile, ioe);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException {
+ final File absoluteFile = indexingDirectory.getAbsoluteFile();
+ logger.debug("Borrowing index writer for {}", indexingDirectory);
+
+ lock.lock();
+ try {
+ IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+ if ( writerCount == null ) {
+ final List<Closeable> closeables = new ArrayList<>();
+ final Directory directory = FSDirectory.open(indexingDirectory);
+ closeables.add(directory);
+
+ try {
+ final Analyzer analyzer = new StandardAnalyzer();
+ closeables.add(analyzer);
+
+ final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
+ config.setWriteLockTimeout(300000L);
+
+ final IndexWriter indexWriter = new IndexWriter(directory, config);
+ writerCount = new IndexWriterCount(indexWriter, analyzer, directory, 1);
+ logger.debug("Providing new index writer for {}", indexingDirectory);
+ } catch (final IOException ioe) {
+ for ( final Closeable closeable : closeables ) {
+ try {
+ closeable.close();
+ } catch (final IOException ioe2) {
+ ioe.addSuppressed(ioe2);
+ }
+ }
+
+ throw ioe;
+ }
+
+ writerCounts.put(absoluteFile, writerCount);
+ } else {
+ logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
+ writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+ writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
+ }
+
+ return writerCount.getWriter();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) {
+ final File absoluteFile = indexingDirectory.getAbsoluteFile();
+ logger.debug("Returning Index Writer for {} to IndexManager", indexingDirectory);
+
+ lock.lock();
+ try {
+ IndexWriterCount count = writerCounts.remove(absoluteFile);
+
+ try {
+ if ( count == null ) {
+ logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. "
+ + "This could potentially lead to a resource leak", writer, indexingDirectory);
+ writer.close();
+ } else if ( count.getCount() <= 1 ) {
+ // we are finished with this writer.
+ logger.debug("Closing Index Writer for {}", indexingDirectory);
+ count.close();
+ } else {
+ // decrement the count.
+ logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1);
+ writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1));
+ }
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close Index Writer {} due to {}", writer, ioe);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+
+ public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
+ final File absoluteFile = indexDir.getAbsoluteFile();
+ logger.debug("Borrowing index searcher for {}", indexDir);
+
+ lock.lock();
+ try {
+ // check if we already have a reader cached.
+ List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
+ if ( currentlyCached == null ) {
+ currentlyCached = new ArrayList<>();
+ activeSearchers.put(absoluteFile, currentlyCached);
+ } else {
+ // keep track of any searchers that have been closed so that we can remove them
+ // from our cache later.
+ final Set<ActiveIndexSearcher> expired = new HashSet<>();
+
+ try {
+ for ( final ActiveIndexSearcher searcher : currentlyCached ) {
+ if ( searcher.isCache() ) {
+ final int refCount = searcher.getSearcher().getIndexReader().getRefCount();
+ if ( refCount <= 0 ) {
+ // if refCount == 0, then the reader has been closed, so we need to discard the searcher
+ logger.debug("Reference count for cached Index Searcher for {} is currently {}; "
+ + "removing cached searcher", absoluteFile, refCount);
+ expired.add(searcher);
+ continue;
+ }
+
+ logger.debug("Providing previously cached index searcher for {}", indexDir);
+ return searcher.getSearcher();
+ }
+ }
+ } finally {
+ // if we have any expired index searchers, we need to close them and remove them
+ // from the cache so that we don't try to use them again later.
+ for ( final ActiveIndexSearcher searcher : expired ) {
+ try {
+ searcher.close();
+ } catch (final Exception e) {
+ logger.debug("Failed to close 'expired' IndexSearcher {}", searcher);
+ }
+
+ currentlyCached.remove(searcher);
+ }
+ }
+ }
+
+ IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+ if ( writerCount == null ) {
+ final Directory directory = FSDirectory.open(absoluteFile);
+ logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir);
+
+ try {
+ final DirectoryReader directoryReader = DirectoryReader.open(directory);
+ final IndexSearcher searcher = new IndexSearcher(directoryReader);
+
+ // we want to cache the searcher that we create, since it's just a reader.
+ final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, directoryReader, directory, true);
+ currentlyCached.add(cached);
+
+ return cached.getSearcher();
+ } catch (final IOException e) {
+ try {
+ directory.close();
+ } catch (final IOException ioe) {
+ e.addSuppressed(ioe);
+ }
+
+ throw e;
+ }
+ } else {
+ logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing "
+ + "counter to {}", indexDir, writerCount.getCount() + 1);
+
+ // increment the writer count to ensure that it's kept open.
+ writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+ writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
+
+ // create a new Index Searcher from the writer so that we don't have an issue with trying
+ // to read from a directory that's locked. If we get the "no segments* file found" with
+ // Lucene, this indicates that an IndexWriter already has the directory open.
+ final IndexWriter writer = writerCount.getWriter();
+ final DirectoryReader directoryReader = DirectoryReader.open(writer, false);
+ final IndexSearcher searcher = new IndexSearcher(directoryReader);
+
+ // we don't want to cache this searcher because it's based on a writer, so we want to get
+ // new values the next time that we search.
+ final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, directoryReader, null, false);
+
+ currentlyCached.add(activeSearcher);
+ return activeSearcher.getSearcher();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+
+ public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) {
+ final File absoluteFile = indexDirectory.getAbsoluteFile();
+ logger.debug("Returning index searcher for {} to IndexManager", indexDirectory);
+
+ lock.lock();
+ try {
+ // check if we already have a reader cached.
+ List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
+ if ( currentlyCached == null ) {
+ logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could "
+ + "result in a resource leak", indexDirectory);
+ return;
+ }
+
+ final Iterator<ActiveIndexSearcher> itr = currentlyCached.iterator();
+ while (itr.hasNext()) {
+ final ActiveIndexSearcher activeSearcher = itr.next();
+ if ( activeSearcher.getSearcher().equals(searcher) ) {
+ if ( activeSearcher.isCache() ) {
+ // the searcher is cached. Just leave it open.
+ logger.debug("Index searcher for {} is cached; leaving open", indexDirectory);
+ return;
+ } else {
+ // searcher is not cached. It was created from a writer, and we want
+ // the newest updates the next time that we get a searcher, so we will
+ // go ahead and close this one out.
+ itr.remove();
+
+ // decrement the writer count because we incremented it when creating the searcher
+ final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+ if ( writerCount != null ) {
+ if ( writerCount.getCount() <= 1 ) {
+ try {
+ logger.debug("Index searcher for {} is not cached. Writer count is "
+ + "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1);
+
+ writerCount.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+ } else {
+ logger.debug("Index searcher for {} is not cached. Writer count is decremented "
+ + "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1);
+
+ writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+ writerCount.getAnalyzer(), writerCount.getDirectory(),
+ writerCount.getCount() - 1));
+ }
+ }
+
+ try {
+ logger.debug("Closing Index Searcher for {}", indexDirectory);
+ activeSearcher.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+ }
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ logger.debug("Closing Index Manager");
+
+ lock.lock();
+ try {
+ IOException ioe = null;
+
+ for ( final IndexWriterCount count : writerCounts.values() ) {
+ try {
+ count.close();
+ } catch (final IOException e) {
+ if ( ioe == null ) {
+ ioe = e;
+ } else {
+ ioe.addSuppressed(e);
+ }
+ }
+ }
+
+ for (final List<ActiveIndexSearcher> searcherList : activeSearchers.values()) {
+ for (final ActiveIndexSearcher searcher : searcherList) {
+ try {
+ searcher.close();
+ } catch (final IOException e) {
+ if ( ioe == null ) {
+ ioe = e;
+ } else {
+ ioe.addSuppressed(e);
+ }
+ }
+ }
+ }
+
+ if ( ioe != null ) {
+ throw ioe;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+
+ private static void close(final Closeable... closeables) throws IOException {
+ IOException ioe = null;
+ for ( final Closeable closeable : closeables ) {
+ if ( closeable == null ) {
+ continue;
+ }
+
+ try {
+ closeable.close();
+ } catch (final IOException e) {
+ if ( ioe == null ) {
+ ioe = e;
+ } else {
+ ioe.addSuppressed(e);
+ }
+ }
+ }
+
+ if ( ioe != null ) {
+ throw ioe;
+ }
+ }
+
+
+ private static class ActiveIndexSearcher implements Closeable {
+ private final IndexSearcher searcher;
+ private final DirectoryReader directoryReader;
+ private final Directory directory;
+ private final boolean cache;
+
+ public ActiveIndexSearcher(IndexSearcher searcher, DirectoryReader directoryReader,
+ Directory directory, final boolean cache) {
+ this.searcher = searcher;
+ this.directoryReader = directoryReader;
+ this.directory = directory;
+ this.cache = cache;
+ }
+
+ public boolean isCache() {
+ return cache;
+ }
+
+ public IndexSearcher getSearcher() {
+ return searcher;
+ }
+
+ @Override
+ public void close() throws IOException {
+ IndexManager.close(directoryReader, directory);
+ }
+ }
+
+
+ private static class IndexWriterCount implements Closeable {
+ private final IndexWriter writer;
+ private final Analyzer analyzer;
+ private final Directory directory;
+ private final int count;
+
+ public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) {
+ this.writer = writer;
+ this.analyzer = analyzer;
+ this.directory = directory;
+ this.count = count;
+ }
+
+ public Analyzer getAnalyzer() {
+ return analyzer;
+ }
+
+ public Directory getDirectory() {
+ return directory;
+ }
+
+ public IndexWriter getWriter() {
+ return writer;
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ @Override
+ public void close() throws IOException {
+ IndexManager.close(writer, analyzer, directory);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
index e2854c3..dcb6e08 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
@@ -17,31 +17,33 @@
package org.apache.nifi.provenance.lucene;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collections;
import java.util.Date;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.nifi.provenance.PersistentProvenanceRepository;
-import org.apache.nifi.provenance.ProvenanceEventRecord;
-import org.apache.nifi.provenance.StandardQueryResult;
-
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexNotFoundException;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TopDocs;
-import org.apache.lucene.store.FSDirectory;
+import org.apache.nifi.provenance.PersistentProvenanceRepository;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.StandardQueryResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class IndexSearch {
-
+ private final Logger logger = LoggerFactory.getLogger(IndexSearch.class);
private final PersistentProvenanceRepository repository;
private final File indexDirectory;
+ private final IndexManager indexManager;
- public IndexSearch(final PersistentProvenanceRepository repo, final File indexDirectory) {
+ public IndexSearch(final PersistentProvenanceRepository repo, final File indexDirectory, final IndexManager indexManager) {
this.repository = repo;
this.indexDirectory = indexDirectory;
+ this.indexManager = indexManager;
}
public StandardQueryResult search(final org.apache.nifi.provenance.search.Query provenanceQuery, final AtomicInteger retrievedCount) throws IOException {
@@ -55,30 +57,57 @@ public class IndexSearch {
final StandardQueryResult sqr = new StandardQueryResult(provenanceQuery, 1);
final Set<ProvenanceEventRecord> matchingRecords;
- try (final DirectoryReader directoryReader = DirectoryReader.open(FSDirectory.open(indexDirectory))) {
- final IndexSearcher searcher = new IndexSearcher(directoryReader);
-
- if (provenanceQuery.getEndDate() == null) {
- provenanceQuery.setEndDate(new Date());
- }
- final Query luceneQuery = LuceneUtil.convertQuery(provenanceQuery);
+ if (provenanceQuery.getEndDate() == null) {
+ provenanceQuery.setEndDate(new Date());
+ }
+ final Query luceneQuery = LuceneUtil.convertQuery(provenanceQuery);
- TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults());
+ final long start = System.nanoTime();
+ IndexSearcher searcher = null;
+ try {
+ searcher = indexManager.borrowIndexSearcher(indexDirectory);
+ final long searchStartNanos = System.nanoTime();
+ final long openSearcherNanos = searchStartNanos - start;
+
+ final TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults());
+ final long finishSearch = System.nanoTime();
+ final long searchNanos = finishSearch - searchStartNanos;
+
+ logger.debug("Searching {} took {} millis; opening searcher took {} millis", this,
+ TimeUnit.NANOSECONDS.toMillis(searchNanos), TimeUnit.NANOSECONDS.toMillis(openSearcherNanos));
+
if (topDocs.totalHits == 0) {
sqr.update(Collections.<ProvenanceEventRecord>emptyList(), 0);
return sqr;
}
final DocsReader docsReader = new DocsReader(repository.getConfiguration().getStorageDirectories());
- matchingRecords = docsReader.read(topDocs, directoryReader, repository.getAllLogFiles(), retrievedCount, provenanceQuery.getMaxResults());
-
+ matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount, provenanceQuery.getMaxResults());
+
+ final long readRecordsNanos = System.nanoTime() - finishSearch;
+ logger.debug("Reading {} records took {} millis for {}", matchingRecords.size(), TimeUnit.NANOSECONDS.toMillis(readRecordsNanos), this);
+
sqr.update(matchingRecords, topDocs.totalHits);
return sqr;
- } catch (final IndexNotFoundException e) {
- // nothing has been indexed yet.
+ } catch (final FileNotFoundException e) {
+ // nothing has been indexed yet, or the data has already aged off
+ logger.warn("Attempted to search Provenance Index {} but could not find the file due to {}", indexDirectory, e);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", e);
+ }
+
sqr.update(Collections.<ProvenanceEventRecord>emptyList(), 0);
return sqr;
+ } finally {
+ if ( searcher != null ) {
+ indexManager.returnIndexSearcher(indexDirectory, searcher);
+ }
}
}
+
+ @Override
+ public String toString() {
+ return "IndexSearcher[" + indexDirectory + "]";
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
index 214267a..5e87913 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
@@ -24,27 +24,27 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.provenance.IndexConfiguration;
-import org.apache.nifi.provenance.PersistentProvenanceRepository;
-import org.apache.nifi.provenance.ProvenanceEventType;
-import org.apache.nifi.provenance.SearchableFields;
-import org.apache.nifi.provenance.StandardProvenanceEventRecord;
-import org.apache.nifi.provenance.rollover.RolloverAction;
-import org.apache.nifi.provenance.search.SearchableField;
-import org.apache.nifi.provenance.serialization.RecordReader;
-import org.apache.nifi.provenance.serialization.RecordReaders;
-
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.IntField;
import org.apache.lucene.document.LongField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.provenance.IndexConfiguration;
+import org.apache.nifi.provenance.PersistentProvenanceRepository;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.SearchableFields;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.provenance.rollover.RolloverAction;
+import org.apache.nifi.provenance.search.SearchableField;
+import org.apache.nifi.provenance.serialization.RecordReader;
+import org.apache.nifi.provenance.serialization.RecordReaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,15 +72,93 @@ public class IndexingAction implements RolloverAction {
doc.add(new StringField(field.getSearchableFieldName(), value.toLowerCase(), store));
}
+
+ public void index(final StandardProvenanceEventRecord record, final IndexWriter indexWriter, final Integer blockIndex) throws IOException {
+ final Map<String, String> attributes = record.getAttributes();
+
+ final Document doc = new Document();
+ addField(doc, SearchableFields.FlowFileUUID, record.getFlowFileUuid(), Store.NO);
+ addField(doc, SearchableFields.Filename, attributes.get(CoreAttributes.FILENAME.key()), Store.NO);
+ addField(doc, SearchableFields.ComponentID, record.getComponentId(), Store.NO);
+ addField(doc, SearchableFields.AlternateIdentifierURI, record.getAlternateIdentifierUri(), Store.NO);
+ addField(doc, SearchableFields.EventType, record.getEventType().name(), Store.NO);
+ addField(doc, SearchableFields.Relationship, record.getRelationship(), Store.NO);
+ addField(doc, SearchableFields.Details, record.getDetails(), Store.NO);
+ addField(doc, SearchableFields.ContentClaimSection, record.getContentClaimSection(), Store.NO);
+ addField(doc, SearchableFields.ContentClaimContainer, record.getContentClaimContainer(), Store.NO);
+ addField(doc, SearchableFields.ContentClaimIdentifier, record.getContentClaimIdentifier(), Store.NO);
+ addField(doc, SearchableFields.SourceQueueIdentifier, record.getSourceQueueIdentifier(), Store.NO);
+
+ if (nonAttributeSearchableFields.contains(SearchableFields.TransitURI)) {
+ addField(doc, SearchableFields.TransitURI, record.getTransitUri(), Store.NO);
+ }
+
+ for (final SearchableField searchableField : attributeSearchableFields) {
+ addField(doc, searchableField, attributes.get(searchableField.getSearchableFieldName()), Store.NO);
+ }
+
+ final String storageFilename = LuceneUtil.substringBefore(record.getStorageFilename(), ".");
+
+ // Index the fields that we always index (unless there's nothing else to index at all)
+ if (!doc.getFields().isEmpty()) {
+ doc.add(new LongField(SearchableFields.LineageStartDate.getSearchableFieldName(), record.getLineageStartDate(), Store.NO));
+ doc.add(new LongField(SearchableFields.EventTime.getSearchableFieldName(), record.getEventTime(), Store.NO));
+ doc.add(new LongField(SearchableFields.FileSize.getSearchableFieldName(), record.getFileSize(), Store.NO));
+ doc.add(new StringField(FieldNames.STORAGE_FILENAME, storageFilename, Store.YES));
+
+ if ( blockIndex == null ) {
+ doc.add(new LongField(FieldNames.STORAGE_FILE_OFFSET, record.getStorageByteOffset(), Store.YES));
+ } else {
+ doc.add(new IntField(FieldNames.BLOCK_INDEX, blockIndex, Store.YES));
+ doc.add(new LongField(SearchableFields.Identifier.getSearchableFieldName(), record.getEventId(), Store.YES));
+ }
+
+ for (final String lineageIdentifier : record.getLineageIdentifiers()) {
+ addField(doc, SearchableFields.LineageIdentifier, lineageIdentifier, Store.NO);
+ }
+
+ // If it's event is a FORK, or JOIN, add the FlowFileUUID for all child/parent UUIDs.
+ if (record.getEventType() == ProvenanceEventType.FORK || record.getEventType() == ProvenanceEventType.CLONE || record.getEventType() == ProvenanceEventType.REPLAY) {
+ for (final String uuid : record.getChildUuids()) {
+ if (!uuid.equals(record.getFlowFileUuid())) {
+ addField(doc, SearchableFields.FlowFileUUID, uuid, Store.NO);
+ }
+ }
+ } else if (record.getEventType() == ProvenanceEventType.JOIN) {
+ for (final String uuid : record.getParentUuids()) {
+ if (!uuid.equals(record.getFlowFileUuid())) {
+ addField(doc, SearchableFields.FlowFileUUID, uuid, Store.NO);
+ }
+ }
+ } else if (record.getEventType() == ProvenanceEventType.RECEIVE && record.getSourceSystemFlowFileIdentifier() != null) {
+ // If we get a receive with a Source System FlowFile Identifier, we add another Document that shows the UUID
+ // that the Source System uses to refer to the data.
+ final String sourceIdentifier = record.getSourceSystemFlowFileIdentifier();
+ final String sourceFlowFileUUID;
+ final int lastColon = sourceIdentifier.lastIndexOf(":");
+ if (lastColon > -1 && lastColon < sourceIdentifier.length() - 2) {
+ sourceFlowFileUUID = sourceIdentifier.substring(lastColon + 1);
+ } else {
+ sourceFlowFileUUID = null;
+ }
+
+ if (sourceFlowFileUUID != null) {
+ addField(doc, SearchableFields.FlowFileUUID, sourceFlowFileUUID, Store.NO);
+ }
+ }
+
+ indexWriter.addDocument(doc);
+ }
+ }
+
@Override
- @SuppressWarnings("deprecation")
public File execute(final File fileRolledOver) throws IOException {
final File indexingDirectory = indexConfiguration.getWritableIndexDirectory(fileRolledOver);
int indexCount = 0;
long maxId = -1L;
try (final Directory directory = FSDirectory.open(indexingDirectory);
- final Analyzer analyzer = new StandardAnalyzer(LuceneUtil.LUCENE_VERSION)) {
+ final Analyzer analyzer = new StandardAnalyzer()) {
final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
config.setWriteLockTimeout(300000L);
@@ -89,6 +167,13 @@ public class IndexingAction implements RolloverAction {
final RecordReader reader = RecordReaders.newRecordReader(fileRolledOver, repository.getAllLogFiles())) {
StandardProvenanceEventRecord record;
while (true) {
+ final Integer blockIndex;
+ if ( reader.isBlockIndexAvailable() ) {
+ blockIndex = reader.getBlockIndex();
+ } else {
+ blockIndex = null;
+ }
+
try {
record = reader.nextRecord();
} catch (final EOFException eof) {
@@ -104,76 +189,8 @@ public class IndexingAction implements RolloverAction {
maxId = record.getEventId();
- final Map<String, String> attributes = record.getAttributes();
-
- final Document doc = new Document();
- addField(doc, SearchableFields.FlowFileUUID, record.getFlowFileUuid(), Store.NO);
- addField(doc, SearchableFields.Filename, attributes.get(CoreAttributes.FILENAME.key()), Store.NO);
- addField(doc, SearchableFields.ComponentID, record.getComponentId(), Store.NO);
- addField(doc, SearchableFields.AlternateIdentifierURI, record.getAlternateIdentifierUri(), Store.NO);
- addField(doc, SearchableFields.EventType, record.getEventType().name(), Store.NO);
- addField(doc, SearchableFields.Relationship, record.getRelationship(), Store.NO);
- addField(doc, SearchableFields.Details, record.getDetails(), Store.NO);
- addField(doc, SearchableFields.ContentClaimSection, record.getContentClaimSection(), Store.NO);
- addField(doc, SearchableFields.ContentClaimContainer, record.getContentClaimContainer(), Store.NO);
- addField(doc, SearchableFields.ContentClaimIdentifier, record.getContentClaimIdentifier(), Store.NO);
- addField(doc, SearchableFields.SourceQueueIdentifier, record.getSourceQueueIdentifier(), Store.NO);
-
- if (nonAttributeSearchableFields.contains(SearchableFields.TransitURI)) {
- addField(doc, SearchableFields.TransitURI, record.getTransitUri(), Store.NO);
- }
-
- for (final SearchableField searchableField : attributeSearchableFields) {
- addField(doc, searchableField, attributes.get(searchableField.getSearchableFieldName()), Store.NO);
- }
-
- final String storageFilename = LuceneUtil.substringBefore(record.getStorageFilename(), ".");
-
- // Index the fields that we always index (unless there's nothing else to index at all)
- if (!doc.getFields().isEmpty()) {
- doc.add(new LongField(SearchableFields.LineageStartDate.getSearchableFieldName(), record.getLineageStartDate(), Store.NO));
- doc.add(new LongField(SearchableFields.EventTime.getSearchableFieldName(), record.getEventTime(), Store.NO));
- doc.add(new LongField(SearchableFields.FileSize.getSearchableFieldName(), record.getFileSize(), Store.NO));
- doc.add(new StringField(FieldNames.STORAGE_FILENAME, storageFilename, Store.YES));
- doc.add(new LongField(FieldNames.STORAGE_FILE_OFFSET, record.getStorageByteOffset(), Store.YES));
-
- for (final String lineageIdentifier : record.getLineageIdentifiers()) {
- addField(doc, SearchableFields.LineageIdentifier, lineageIdentifier, Store.NO);
- }
-
- // If it's event is a FORK, or JOIN, add the FlowFileUUID for all child/parent UUIDs.
- if (record.getEventType() == ProvenanceEventType.FORK || record.getEventType() == ProvenanceEventType.CLONE || record.getEventType() == ProvenanceEventType.REPLAY) {
- for (final String uuid : record.getChildUuids()) {
- if (!uuid.equals(record.getFlowFileUuid())) {
- addField(doc, SearchableFields.FlowFileUUID, uuid, Store.NO);
- }
- }
- } else if (record.getEventType() == ProvenanceEventType.JOIN) {
- for (final String uuid : record.getParentUuids()) {
- if (!uuid.equals(record.getFlowFileUuid())) {
- addField(doc, SearchableFields.FlowFileUUID, uuid, Store.NO);
- }
- }
- } else if (record.getEventType() == ProvenanceEventType.RECEIVE && record.getSourceSystemFlowFileIdentifier() != null) {
- // If we get a receive with a Source System FlowFile Identifier, we add another Document that shows the UUID
- // that the Source System uses to refer to the data.
- final String sourceIdentifier = record.getSourceSystemFlowFileIdentifier();
- final String sourceFlowFileUUID;
- final int lastColon = sourceIdentifier.lastIndexOf(":");
- if (lastColon > -1 && lastColon < sourceIdentifier.length() - 2) {
- sourceFlowFileUUID = sourceIdentifier.substring(lastColon + 1);
- } else {
- sourceFlowFileUUID = null;
- }
-
- if (sourceFlowFileUUID != null) {
- addField(doc, SearchableFields.FlowFileUUID, sourceFlowFileUUID, Store.NO);
- }
- }
-
- indexWriter.addDocument(doc);
- indexCount++;
- }
+ index(record, indexWriter, blockIndex);
+ indexCount++;
}
indexWriter.commit();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
index a7076d5..59dc10b 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
@@ -27,8 +27,8 @@ import java.util.List;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.provenance.SearchableFields;
import org.apache.nifi.provenance.search.SearchTerm;
-
import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanClause.Occur;
@@ -78,7 +78,16 @@ public class LuceneUtil {
final String searchString = baseName + ".";
for (final Path path : allProvenanceLogs) {
if (path.toFile().getName().startsWith(searchString)) {
- matchingFiles.add(path.toFile());
+ final File file = path.toFile();
+ if ( file.exists() ) {
+ matchingFiles.add(file);
+ } else {
+ final File dir = file.getParentFile();
+ final File gzFile = new File(dir, file.getName() + ".gz");
+ if ( gzFile.exists() ) {
+ matchingFiles.add(gzFile);
+ }
+ }
}
}
@@ -132,6 +141,19 @@ public class LuceneUtil {
return filenameComp;
}
+ final IndexableField fileOffset1 = o1.getField(FieldNames.BLOCK_INDEX);
+ final IndexableField fileOffset2 = o1.getField(FieldNames.BLOCK_INDEX);
+ if ( fileOffset1 != null && fileOffset2 != null ) {
+ final int blockIndexResult = Long.compare(fileOffset1.numericValue().longValue(), fileOffset2.numericValue().longValue());
+ if ( blockIndexResult != 0 ) {
+ return blockIndexResult;
+ }
+
+ final long eventId1 = o1.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
+ final long eventId2 = o2.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
+ return Long.compare(eventId1, eventId2);
+ }
+
final long offset1 = o1.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
final long offset2 = o2.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
return Long.compare(offset1, offset2);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
index 862bc2b..8bdc88a 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
@@ -20,12 +20,79 @@ import java.io.Closeable;
import java.io.IOException;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.provenance.toc.TocReader;
public interface RecordReader extends Closeable {
+ /**
+ * Returns the next record in the reader, or <code>null</code> if there is no more data available.
+ * @return
+ * @throws IOException
+ */
StandardProvenanceEventRecord nextRecord() throws IOException;
+ /**
+ * Skips the specified number of bytes
+ * @param bytesToSkip
+ * @throws IOException
+ */
void skip(long bytesToSkip) throws IOException;
+ /**
+ * Skips to the specified byte offset in the underlying stream.
+ * @param position
+ * @throws IOException if the underlying stream throws IOException, or if the reader has already
+ * passed the specified byte offset
+ */
void skipTo(long position) throws IOException;
+
+ /**
+ * Skips to the specified compression block
+ *
+ * @param blockIndex
+ * @throws IOException if the underlying stream throws IOException, or if the reader has already
+ * read passed the specified compression block index
+ * @throws IllegalStateException if the RecordReader does not have a TableOfContents associated with it
+ */
+ void skipToBlock(int blockIndex) throws IOException;
+
+ /**
+ * Returns the block index that the Reader is currently reading from.
+ * Note that the block index is incremented at the beginning of the {@link #nextRecord()}
+ * method. This means that this method will return the block from which the previous record was read,
+ * if calling {@link #nextRecord()} continually, not the block from which the next record will be read.
+ * @return
+ */
+ int getBlockIndex();
+
+ /**
+ * Returns <code>true</code> if the compression block index is available. It will be available
+ * if and only if the reader is created with a TableOfContents
+ *
+ * @return
+ */
+ boolean isBlockIndexAvailable();
+
+ /**
+ * Returns the {@link TocReader} that is used to keep track of compression blocks, if one exists,
+ * <code>null</code> otherwise
+ * @return
+ */
+ TocReader getTocReader();
+
+ /**
+ * Returns the number of bytes that have been consumed from the stream (read or skipped).
+ * @return
+ */
+ long getBytesConsumed();
+
+ /**
+ * Returns the ID of the last event in this record reader, or -1 if the reader has no records or
+ * has already read through all records. Note: This method will consume the stream until the end,
+ * so no more records will be available on this reader after calling this method.
+ *
+ * @return
+ * @throws IOException
+ */
+ long getMaxEventId() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
index 8f06995..dff281c 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
@@ -16,7 +16,6 @@
*/
package org.apache.nifi.provenance.serialization;
-import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
@@ -24,82 +23,90 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.Collection;
-import java.util.zip.GZIPInputStream;
-import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.provenance.StandardRecordReader;
import org.apache.nifi.provenance.lucene.LuceneUtil;
+import org.apache.nifi.provenance.toc.StandardTocReader;
+import org.apache.nifi.provenance.toc.TocReader;
+import org.apache.nifi.provenance.toc.TocUtil;
public class RecordReaders {
public static RecordReader newRecordReader(File file, final Collection<Path> provenanceLogFiles) throws IOException {
final File originalFile = file;
-
- if (!file.exists()) {
- if (provenanceLogFiles == null) {
- throw new FileNotFoundException(file.toString());
- }
-
- final String baseName = LuceneUtil.substringBefore(file.getName(), ".") + ".";
- for (final Path path : provenanceLogFiles) {
- if (path.toFile().getName().startsWith(baseName)) {
- file = path.toFile();
- break;
- }
- }
- }
-
InputStream fis = null;
- if ( file.exists() ) {
- try {
- fis = new FileInputStream(file);
- } catch (final FileNotFoundException fnfe) {
- fis = null;
- }
- }
-
- openStream: while ( fis == null ) {
- final File dir = file.getParentFile();
- final String baseName = LuceneUtil.substringBefore(file.getName(), ".");
-
- // depending on which rollover actions have occurred, we could have 3 possibilities for the
- // filename that we need. The majority of the time, we will use the extension ".prov.indexed.gz"
- // because most often we are compressing on rollover and most often we have already finished
- // compressing by the time that we are querying the data.
- for ( final String extension : new String[] {".indexed.prov.gz", ".indexed.prov", ".prov"} ) {
- file = new File(dir, baseName + extension);
- if ( file.exists() ) {
- try {
- fis = new FileInputStream(file);
- break openStream;
- } catch (final FileNotFoundException fnfe) {
- // file was modified by a RolloverAction after we verified that it exists but before we could
- // create an InputStream for it. Start over.
- fis = null;
- continue openStream;
- }
- }
- }
-
- break;
- }
- if ( fis == null ) {
- throw new FileNotFoundException("Unable to locate file " + originalFile);
- }
- final InputStream readableStream;
- if (file.getName().endsWith(".gz")) {
- readableStream = new BufferedInputStream(new GZIPInputStream(fis));
- } else {
- readableStream = new BufferedInputStream(fis);
+ try {
+ if (!file.exists()) {
+ if (provenanceLogFiles != null) {
+ final String baseName = LuceneUtil.substringBefore(file.getName(), ".") + ".";
+ for (final Path path : provenanceLogFiles) {
+ if (path.toFile().getName().startsWith(baseName)) {
+ file = path.toFile();
+ break;
+ }
+ }
+ }
+ }
+
+ if ( file.exists() ) {
+ try {
+ fis = new FileInputStream(file);
+ } catch (final FileNotFoundException fnfe) {
+ fis = null;
+ }
+ }
+
+ String filename = file.getName();
+ openStream: while ( fis == null ) {
+ final File dir = file.getParentFile();
+ final String baseName = LuceneUtil.substringBefore(file.getName(), ".");
+
+ // depending on which rollover actions have occurred, we could have 3 possibilities for the
+ // filename that we need. The majority of the time, we will use the extension ".prov.indexed.gz"
+ // because most often we are compressing on rollover and most often we have already finished
+ // compressing by the time that we are querying the data.
+ for ( final String extension : new String[] {".prov.gz", ".prov"} ) {
+ file = new File(dir, baseName + extension);
+ if ( file.exists() ) {
+ try {
+ fis = new FileInputStream(file);
+ filename = baseName + extension;
+ break openStream;
+ } catch (final FileNotFoundException fnfe) {
+ // file was modified by a RolloverAction after we verified that it exists but before we could
+ // create an InputStream for it. Start over.
+ fis = null;
+ continue openStream;
+ }
+ }
+ }
+
+ break;
+ }
+
+ if ( fis == null ) {
+ throw new FileNotFoundException("Unable to locate file " + originalFile);
+ }
+
+ final File tocFile = TocUtil.getTocFile(file);
+ if ( tocFile.exists() ) {
+ final TocReader tocReader = new StandardTocReader(tocFile);
+ return new StandardRecordReader(fis, filename, tocReader);
+ } else {
+ return new StandardRecordReader(fis, filename);
+ }
+ } catch (final IOException ioe) {
+ if ( fis != null ) {
+ try {
+ fis.close();
+ } catch (final IOException inner) {
+ ioe.addSuppressed(inner);
+ }
+ }
+
+ throw ioe;
}
-
- final DataInputStream dis = new DataInputStream(readableStream);
- @SuppressWarnings("unused")
- final String repoClassName = dis.readUTF();
- final int serializationVersion = dis.readInt();
-
- return new StandardRecordReader(dis, serializationVersion, file.getName());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
index de98ab9..58f4dc2 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.IOException;
import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.toc.TocWriter;
public interface RecordWriter extends Closeable {
@@ -82,4 +83,9 @@ public interface RecordWriter extends Closeable {
*/
void sync() throws IOException;
+ /**
+ * Returns the TOC Writer that is being used to write the Table of Contents for this journal
+ * @return
+ */
+ TocWriter getTocWriter();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
index 15349de..47b7c7e 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
@@ -20,11 +20,20 @@ import java.io.File;
import java.io.IOException;
import org.apache.nifi.provenance.StandardRecordWriter;
+import org.apache.nifi.provenance.toc.StandardTocWriter;
+import org.apache.nifi.provenance.toc.TocUtil;
+import org.apache.nifi.provenance.toc.TocWriter;
public class RecordWriters {
+ private static final int DEFAULT_COMPRESSION_BLOCK_SIZE = 1024 * 1024; // 1 MB
- public static RecordWriter newRecordWriter(final File file) throws IOException {
- return new StandardRecordWriter(file);
+ public static RecordWriter newRecordWriter(final File file, final boolean compressed, final boolean createToc) throws IOException {
+ return newRecordWriter(file, compressed, createToc, DEFAULT_COMPRESSION_BLOCK_SIZE);
+ }
+
+ public static RecordWriter newRecordWriter(final File file, final boolean compressed, final boolean createToc, final int compressionBlockBytes) throws IOException {
+ final TocWriter tocWriter = createToc ? new StandardTocWriter(TocUtil.getTocFile(file), false, false) : null;
+ return new StandardRecordWriter(file, tocWriter, compressed, compressionBlockBytes);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
new file mode 100644
index 0000000..8944cec
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+/**
+ * Standard implementation of TocReader.
+ *
+ * Expects .toc file to be in the following format;
+ *
+ * byte 0: version
+ * byte 1: boolean: compressionFlag -> 0 = journal is NOT compressed, 1 = journal is compressed
+ * byte 2-9: long: offset of block 0
+ * byte 10-17: long: offset of block 1
+ * ...
+ * byte (N*8+2)-(N*8+9): long: offset of block N
+ */
+public class StandardTocReader implements TocReader {
+ private final boolean compressed;
+ private final long[] offsets;
+
+ public StandardTocReader(final File file) throws IOException {
+ try (final FileInputStream fis = new FileInputStream(file);
+ final DataInputStream dis = new DataInputStream(fis)) {
+
+ final int version = dis.read();
+ if ( version < 0 ) {
+ throw new EOFException();
+ }
+
+ final int compressionFlag = dis.read();
+ if ( compressionFlag < 0 ) {
+ throw new EOFException();
+ }
+
+ if ( compressionFlag == 0 ) {
+ compressed = false;
+ } else if ( compressionFlag == 1 ) {
+ compressed = true;
+ } else {
+ throw new IOException("Table of Contents appears to be corrupt: could not read 'compression flag' from header; expected value of 0 or 1 but got " + compressionFlag);
+ }
+
+ final int numBlocks = (int) ((file.length() - 2) / 8);
+ offsets = new long[numBlocks];
+
+ for (int i=0; i < numBlocks; i++) {
+ offsets[i] = dis.readLong();
+ }
+ }
+ }
+
+ @Override
+ public boolean isCompressed() {
+ return compressed;
+ }
+
+ @Override
+ public long getBlockOffset(final int blockIndex) {
+ if ( blockIndex >= offsets.length ) {
+ return -1L;
+ }
+ return offsets[blockIndex];
+ }
+
+ @Override
+ public long getLastBlockOffset() {
+ if ( offsets.length == 0 ) {
+ return 0L;
+ }
+ return offsets[offsets.length - 1];
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public int getBlockIndex(final long blockOffset) {
+ for (int i=0; i < offsets.length; i++) {
+ if ( offsets[i] > blockOffset ) {
+ return i-1;
+ }
+ }
+
+ return offsets.length - 1;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
new file mode 100644
index 0000000..488f225
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Standard implementation of {@link TocWriter}.
+ *
+ * Format of .toc file:
+ * byte 0: version
+ * byte 1: compressed: 0 -> not compressed, 1 -> compressed
+ * byte 2-9: long: offset of block 0
+ * byte 10-17: long: offset of block 1
+ * ...
+ * byte (N*8+2)-(N*8+9): long: offset of block N
+ */
+public class StandardTocWriter implements TocWriter {
+ private static final Logger logger = LoggerFactory.getLogger(StandardTocWriter.class);
+
+ public static final byte VERSION = 1;
+
+ private final File file;
+ private final FileOutputStream fos;
+ private final boolean alwaysSync;
+ private int index = -1;
+
+ /**
+ * Creates a StandardTocWriter that writes to the given file.
+ * @param file the file to write to
+ * @param compressionFlag whether or not the journal is compressed
+ * @throws FileNotFoundException
+ */
+ public StandardTocWriter(final File file, final boolean compressionFlag, final boolean alwaysSync) throws IOException {
+ final File tocDir = file.getParentFile();
+ if ( !tocDir.exists() ) {
+ Files.createDirectories(tocDir.toPath());
+ }
+
+ this.file = file;
+ fos = new FileOutputStream(file);
+ this.alwaysSync = alwaysSync;
+
+ final byte[] header = new byte[2];
+ header[0] = VERSION;
+ header[1] = (byte) (compressionFlag ? 1 : 0);
+ fos.write(header);
+ fos.flush();
+
+ if ( alwaysSync ) {
+ sync();
+ }
+ }
+
+ @Override
+ public void addBlockOffset(final long offset) throws IOException {
+ final BufferedOutputStream bos = new BufferedOutputStream(fos);
+ final DataOutputStream dos = new DataOutputStream(bos);
+ dos.writeLong(offset);
+ dos.flush();
+ index++;
+ logger.debug("Adding block {} at offset {}", index, offset);
+
+ if ( alwaysSync ) {
+ sync();
+ }
+ }
+
+ @Override
+ public void sync() throws IOException {
+ fos.getFD().sync();
+ }
+
+ @Override
+ public int getCurrentBlockIndex() {
+ return index;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (alwaysSync) {
+ fos.getFD().sync();
+ }
+
+ fos.close();
+ }
+
+ @Override
+ public File getFile() {
+ return file;
+ }
+
+ @Override
+ public String toString() {
+ return "TOC Writer for " + file;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java
new file mode 100644
index 0000000..7c197be
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import java.io.Closeable;
+
+/**
+ * <p>
+ * Reads a Table of Contents (.toc file) for a corresponding Journal File. We use a Table of Contents
+ * to map a Block Index to an offset into the Journal file where that Block begins. We do this so that
+ * we can then persist a Block Index for an event and then compress the Journal later. This way, we can
+ * get good compression by compressing a large batch of events at once, and this way we can also look up
+ * an event in a Journal that has not been compressed by looking in the Table of Contents or lookup the
+ * event in a Journal post-compression by simply rewriting the TOC while we compress the data.
+ * </p>
+ */
+public interface TocReader extends Closeable {
+
+ /**
+ * Indicates whether or not the corresponding Journal file is compressed
+ * @return
+ */
+ boolean isCompressed();
+
+ /**
+ * Returns the byte offset into the Journal File for the Block with the given index.
+ * @param blockIndex
+ * @return
+ */
+ long getBlockOffset(int blockIndex);
+
+ /**
+ * Returns the byte offset into the Journal File of the last Block in the given index
+ * @return
+ */
+ long getLastBlockOffset();
+
+ /**
+ * Returns the index of the block that contains the given offset
+ * @param blockOffset
+ * @return
+ */
+ int getBlockIndex(long blockOffset);
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java
new file mode 100644
index 0000000..c30ac98
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import java.io.File;
+
+import org.apache.nifi.provenance.lucene.LuceneUtil;
+
+public class TocUtil {
+
+ /**
+ * Returns the file that should be used as the Table of Contents for the given Journal File
+ * @param journalFile
+ * @return
+ */
+ public static File getTocFile(final File journalFile) {
+ final File tocDir = new File(journalFile.getParentFile(), "toc");
+ final String basename = LuceneUtil.substringBefore(journalFile.getName(), ".");
+ final File tocFile = new File(tocDir, basename + ".toc");
+ return tocFile;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java
new file mode 100644
index 0000000..c678053
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Writes a .toc file
+ */
+public interface TocWriter extends Closeable {
+
+ /**
+ * Adds the given block offset as the next Block Offset in the Table of Contents
+ * @param offset
+ * @throws IOException
+ */
+ void addBlockOffset(long offset) throws IOException;
+
+ /**
+ * Returns the index of the current Block
+ * @return
+ */
+ int getCurrentBlockIndex();
+
+ /**
+ * Returns the file that is currently being written to
+ * @return
+ */
+ File getFile();
+
+ /**
+ * Synchronizes the data with the underlying storage device
+ * @throws IOException
+ */
+ void sync() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index 5be208b..25a363f 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.provenance;
+import static org.apache.nifi.provenance.TestUtil.createFlowFile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -25,14 +26,14 @@ import java.io.FileFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.core.SimpleAnalyzer;
@@ -45,7 +46,6 @@ import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.FSDirectory;
import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.provenance.lineage.EventNode;
import org.apache.nifi.provenance.lineage.Lineage;
import org.apache.nifi.provenance.lineage.LineageEdge;
@@ -59,8 +59,10 @@ import org.apache.nifi.provenance.search.SearchableField;
import org.apache.nifi.provenance.serialization.RecordReader;
import org.apache.nifi.provenance.serialization.RecordReaders;
import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.util.file.FileUtils;
import org.junit.After;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
@@ -72,87 +74,47 @@ public class TestPersistentProvenanceRepository {
public TestName name = new TestName();
private PersistentProvenanceRepository repo;
+ private RepositoryConfiguration config;
public static final int DEFAULT_ROLLOVER_MILLIS = 2000;
private RepositoryConfiguration createConfiguration() {
- final RepositoryConfiguration config = new RepositoryConfiguration();
+ config = new RepositoryConfiguration();
config.addStorageDirectory(new File("target/storage/" + UUID.randomUUID().toString()));
- config.setCompressOnRollover(false);
+ config.setCompressOnRollover(true);
config.setMaxEventFileLife(2000L, TimeUnit.SECONDS);
+ config.setCompressionBlockBytes(100);
return config;
}
+ @BeforeClass
+ public static void setLogLevel() {
+ System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
+ }
+
@Before
public void printTestName() {
System.out.println("\n\n\n*********************** " + name.getMethodName() + " *****************************");
}
@After
- public void closeRepo() {
+ public void closeRepo() throws IOException {
if (repo != null) {
try {
repo.close();
} catch (final IOException ioe) {
}
}
+
+ // Delete all of the storage files. We do this in order to clean up the tons of files that
+ // we create but also to ensure that we have closed all of the file handles. If we leave any
+ // streams open, for instance, this will throw an IOException, causing our unit test to fail.
+ for ( final File storageDir : config.getStorageDirectories() ) {
+ FileUtils.deleteFile(storageDir, true);
+ }
}
- private FlowFile createFlowFile(final long id, final long fileSize, final Map<String, String> attributes) {
- final Map<String, String> attrCopy = new HashMap<>(attributes);
-
- return new FlowFile() {
- @Override
- public long getId() {
- return id;
- }
-
- @Override
- public long getEntryDate() {
- return System.currentTimeMillis();
- }
-
- @Override
- public Set<String> getLineageIdentifiers() {
- return new HashSet<String>();
- }
-
- @Override
- public long getLineageStartDate() {
- return System.currentTimeMillis();
- }
-
- @Override
- public Long getLastQueueDate() {
- return System.currentTimeMillis();
- }
-
- @Override
- public boolean isPenalized() {
- return false;
- }
-
- @Override
- public String getAttribute(final String s) {
- return attrCopy.get(s);
- }
-
- @Override
- public long getSize() {
- return fileSize;
- }
-
- @Override
- public Map<String, String> getAttributes() {
- return attrCopy;
- }
-
- @Override
- public int compareTo(final FlowFile o) {
- return 0;
- }
- };
- }
+
private EventReporter getEventReporter() {
return new EventReporter() {
@@ -261,6 +223,8 @@ public class TestPersistentProvenanceRepository {
repo.registerEvent(record);
}
+ Thread.sleep(1000L);
+
repo.close();
Thread.sleep(500L); // Give the repo time to shutdown (i.e., close all file handles, etc.)
@@ -417,10 +381,10 @@ public class TestPersistentProvenanceRepository {
@Test
public void testIndexAndCompressOnRolloverAndSubsequentSearch() throws IOException, InterruptedException, ParseException {
final RepositoryConfiguration config = createConfiguration();
- config.setMaxRecordLife(3, TimeUnit.SECONDS);
- config.setMaxStorageCapacity(1024L * 1024L);
+ config.setMaxRecordLife(30, TimeUnit.SECONDS);
+ config.setMaxStorageCapacity(1024L * 1024L * 10);
config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
- config.setMaxEventFileCapacity(1024L * 1024L);
+ config.setMaxEventFileCapacity(1024L * 1024L * 10);
config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
@@ -923,12 +887,16 @@ public class TestPersistentProvenanceRepository {
final PersistentProvenanceRepository secondRepo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
secondRepo.initialize(getEventReporter());
- final ProvenanceEventRecord event11 = builder.build();
- secondRepo.registerEvent(event11);
- secondRepo.waitForRollover();
- final ProvenanceEventRecord event11Retrieved = secondRepo.getEvent(10L);
- assertNotNull(event11Retrieved);
- assertEquals(10, event11Retrieved.getEventId());
+ try {
+ final ProvenanceEventRecord event11 = builder.build();
+ secondRepo.registerEvent(event11);
+ secondRepo.waitForRollover();
+ final ProvenanceEventRecord event11Retrieved = secondRepo.getEvent(10L);
+ assertNotNull(event11Retrieved);
+ assertEquals(10, event11Retrieved.getEventId());
+ } finally {
+ secondRepo.close();
+ }
}
@Test
@@ -998,6 +966,73 @@ public class TestPersistentProvenanceRepository {
storageDirFiles = config.getStorageDirectories().get(0).listFiles(indexFileFilter);
assertEquals(0, storageDirFiles.length);
}
+
+
+ @Test
+ public void testBackPressure() throws IOException, InterruptedException {
+ final RepositoryConfiguration config = createConfiguration();
+ config.setMaxEventFileCapacity(1L); // force rollover on each record.
+ config.setJournalCount(1);
+
+ final AtomicInteger journalCountRef = new AtomicInteger(0);
+
+ repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
+ @Override
+ protected int getJournalCount() {
+ return journalCountRef.get();
+ }
+ };
+ repo.initialize(getEventReporter());
+
+ final Map<String, String> attributes = new HashMap<>();
+ final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+ builder.setEventTime(System.currentTimeMillis());
+ builder.setEventType(ProvenanceEventType.RECEIVE);
+ builder.setTransitUri("nifi://unit-test");
+ attributes.put("uuid", UUID.randomUUID().toString());
+ builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+ builder.setComponentId("1234");
+ builder.setComponentType("dummy processor");
+
+ // ensure that we can register the events.
+ for (int i = 0; i < 10; i++) {
+ builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
+ attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
+ repo.registerEvent(builder.build());
+ }
+
+ // set number of journals to 6 so that we will block.
+ journalCountRef.set(6);
+
+ final AtomicLong threadNanos = new AtomicLong(0L);
+ final Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ final long start = System.nanoTime();
+ builder.fromFlowFile(createFlowFile(13, 3000L, attributes));
+ attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 13);
+ repo.registerEvent(builder.build());
+ threadNanos.set(System.nanoTime() - start);
+ }
+ });
+ t.start();
+
+ Thread.sleep(1500L);
+
+ journalCountRef.set(1);
+ t.join();
+
+ final int threadMillis = (int) TimeUnit.NANOSECONDS.toMillis(threadNanos.get());
+ assertTrue(threadMillis > 1200); // use 1200 to account for the fact that the timing is not exact
+
+ builder.fromFlowFile(createFlowFile(15, 3000L, attributes));
+ attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 15);
+ repo.registerEvent(builder.build());
+ }
+
+
+ // TODO: test EOF on merge
+ // TODO: Test journal with no records
@Test
public void testTextualQuery() throws InterruptedException, IOException, ParseException {
[07/10] incubator-nifi git commit: Merge branch 'develop' of
http://git-wip-us.apache.org/repos/asf/incubator-nifi into develop
Posted by ma...@apache.org.
Merge branch 'develop' of http://git-wip-us.apache.org/repos/asf/incubator-nifi into develop
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/c10facae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/c10facae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/c10facae
Branch: refs/heads/develop
Commit: c10facae487a900af69df89bf393d413d43725bc
Parents: 953d9e5 5481889
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Apr 27 09:29:19 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Apr 27 10:47:07 2015 -0400
----------------------------------------------------------------------
.../apache/nifi/processors/aws/sns/PutSNS.java | 5 +-
.../apache/nifi/processors/aws/sqs/GetSQS.java | 3 +-
.../org/apache/nifi/processors/GeoEnrichIP.java | 38 +-
.../nifi/processors/maxmind/DatabaseReader.java | 44 +-
.../nifi-hl7-bundle/nifi-hl7-processors/pom.xml | 114 +--
.../processors/hl7/ExtractHL7Attributes.java | 365 +++++----
.../apache/nifi/processors/hl7/RouteHL7.java | 300 +++----
.../hl7/TestExtractHL7Attributes.java | 30 +-
.../apache/nifi/processors/kafka/GetKafka.java | 438 +++++-----
.../apache/nifi/processors/kafka/PutKafka.java | 285 +++----
.../additionalDetails.html | 28 +-
.../additionalDetails.html | 30 +-
.../nifi/processors/kafka/TestGetKafka.java | 58 +-
.../nifi/processors/kafka/TestPutKafka.java | 189 ++---
nifi/nifi-nar-bundles/nifi-kafka-bundle/pom.xml | 14 +-
.../processors/kite/AbstractKiteProcessor.java | 10 +-
.../apache/nifi/processors/kite/AvroUtil.java | 18 +-
.../nifi/processors/kite/ConvertCSVToAvro.java | 348 ++++----
.../nifi/processors/kite/ConvertJSONToAvro.java | 2 +-
.../processors/kite/StoreInKiteDataset.java | 208 ++---
.../processors/kite/TestCSVToAvroProcessor.java | 181 ++---
.../kite/TestConfigurationProperty.java | 65 +-
.../nifi/processors/kite/TestGetSchema.java | 117 ++-
.../kite/TestJSONToAvroProcessor.java | 58 +-
.../kite/TestKiteProcessorsCluster.java | 125 ++-
.../kite/TestKiteStorageProcessor.java | 225 +++--
.../apache/nifi/processors/kite/TestUtil.java | 114 +--
.../nifi-yandex-processors/pom.xml | 4 +-
.../nifi/processors/yandex/YandexTranslate.java | 381 +++++----
.../processors/yandex/model/Translation.java | 55 +-
.../nifi/processors/yandex/util/Languages.java | 124 +--
.../processors/yandex/TestYandexTranslate.java | 317 ++++----
.../nifi-language-translation-bundle/pom.xml | 28 +-
.../nifi-twitter-processors/pom.xml | 10 +-
.../nifi/processors/twitter/GetTwitter.java | 450 +++++-----
.../web/StandardContentViewerController.java | 9 +-
.../standard/AbstractJsonPathProcessor.java | 19 +-
.../standard/Base64EncodeContent.java | 11 +-
.../nifi/processors/standard/BinFiles.java | 170 ++--
.../processors/standard/CompressContent.java | 38 +-
.../nifi/processors/standard/ControlRate.java | 39 +-
.../standard/ConvertCharacterSet.java | 37 +-
.../processors/standard/DetectDuplicate.java | 28 +-
.../processors/standard/DistributeLoad.java | 89 +-
.../nifi/processors/standard/EncodeContent.java | 27 +-
.../processors/standard/EncryptContent.java | 23 +-
.../processors/standard/EvaluateJsonPath.java | 91 ++-
.../standard/EvaluateRegularExpression.java | 31 +-
.../nifi/processors/standard/EvaluateXPath.java | 89 +-
.../processors/standard/EvaluateXQuery.java | 63 +-
.../processors/standard/ExecuteProcess.java | 19 +-
.../standard/ExecuteStreamCommand.java | 71 +-
.../nifi/processors/standard/ExtractText.java | 26 +-
.../processors/standard/GenerateFlowFile.java | 10 +-
.../apache/nifi/processors/standard/GetFTP.java | 14 +-
.../nifi/processors/standard/GetFile.java | 32 +-
.../processors/standard/GetFileTransfer.java | 9 +-
.../nifi/processors/standard/GetHTTP.java | 69 +-
.../nifi/processors/standard/GetJMSTopic.java | 50 +-
.../nifi/processors/standard/GetSFTP.java | 16 +-
.../processors/standard/HandleHttpRequest.java | 468 ++++++-----
.../processors/standard/HandleHttpResponse.java | 127 +--
.../nifi/processors/standard/HashAttribute.java | 38 +-
.../nifi/processors/standard/HashContent.java | 18 +-
.../processors/standard/IdentifyMimeType.java | 17 +-
.../nifi/processors/standard/InvokeHTTP.java | 149 ++--
.../nifi/processors/standard/JmsConsumer.java | 100 +--
.../nifi/processors/standard/ListenHTTP.java | 18 +-
.../nifi/processors/standard/ListenUDP.java | 133 ++-
.../nifi/processors/standard/LogAttribute.java | 5 +-
.../nifi/processors/standard/MergeContent.java | 180 ++--
.../nifi/processors/standard/ModifyBytes.java | 3 -
.../processors/standard/MonitorActivity.java | 27 +-
.../nifi/processors/standard/PostHTTP.java | 152 ++--
.../nifi/processors/standard/PutEmail.java | 423 +++++-----
.../apache/nifi/processors/standard/PutFTP.java | 56 +-
.../nifi/processors/standard/PutFile.java | 264 +++---
.../processors/standard/PutFileTransfer.java | 127 ++-
.../apache/nifi/processors/standard/PutJMS.java | 133 ++-
.../nifi/processors/standard/PutSFTP.java | 9 +-
.../nifi/processors/standard/ReplaceText.java | 163 ++--
.../standard/ReplaceTextWithMapping.java | 215 +++--
.../processors/standard/RouteOnAttribute.java | 111 ++-
.../processors/standard/RouteOnContent.java | 149 ++--
.../nifi/processors/standard/ScanAttribute.java | 115 ++-
.../nifi/processors/standard/ScanContent.java | 83 +-
.../processors/standard/SegmentContent.java | 75 +-
.../nifi/processors/standard/SplitContent.java | 163 ++--
.../nifi/processors/standard/SplitJson.java | 74 +-
.../nifi/processors/standard/SplitText.java | 142 ++--
.../nifi/processors/standard/SplitXml.java | 61 +-
.../nifi/processors/standard/TransformXml.java | 121 +--
.../nifi/processors/standard/UnpackContent.java | 216 +++--
.../nifi/processors/standard/ValidateXml.java | 45 +-
.../servlets/ContentAcknowledgmentServlet.java | 53 +-
.../standard/servlets/ListenHTTPServlet.java | 156 ++--
.../nifi/processors/standard/util/Bin.java | 6 +-
.../processors/standard/util/BinManager.java | 26 +-
.../standard/util/DocumentReaderCallback.java | 3 +-
.../processors/standard/util/FTPTransfer.java | 351 +++++---
.../nifi/processors/standard/util/FTPUtils.java | 38 +-
.../nifi/processors/standard/util/FileInfo.java | 3 +-
.../processors/standard/util/FileTransfer.java | 356 ++++----
.../processors/standard/util/JmsFactory.java | 127 ++-
.../standard/util/JmsProcessingSummary.java | 100 ++-
.../processors/standard/util/JmsProperties.java | 257 +++---
.../util/JsonPathExpressionValidator.java | 107 ++-
.../standard/util/SFTPConnection.java | 9 -
.../processors/standard/util/SFTPTransfer.java | 342 +++++---
.../processors/standard/util/SFTPUtils.java | 179 ++--
.../standard/util/UDPStreamConsumer.java | 25 +-
.../util/ValidatingBase32InputStream.java | 1 -
.../standard/util/WrappedMessageConsumer.java | 9 +-
.../standard/util/WrappedMessageProducer.java | 9 +-
.../standard/util/XmlSplitterSaxParser.java | 11 +-
.../additionalDetails.html | 6 +-
.../additionalDetails.html | 2 +-
.../org/apache/tika/mime/custom-mimetypes.xml | 144 ++--
.../src/test/java/TestIngestAndUpdate.java | 3 +-
.../processors/standard/CaptureServlet.java | 12 +-
.../processors/standard/HelloWorldServlet.java | 3 +-
.../standard/RESTServiceContentModified.java | 15 +-
.../standard/TestBase64EncodeContent.java | 41 +-
.../standard/TestCompressContent.java | 96 ++-
.../processors/standard/TestControlRate.java | 6 +-
.../standard/TestConvertCharacterSet.java | 13 +-
.../standard/TestDetectDuplicate.java | 36 +-
.../processors/standard/TestDistributeLoad.java | 21 +-
.../processors/standard/TestEncodeContent.java | 66 +-
.../processors/standard/TestEncryptContent.java | 29 +-
.../standard/TestEvaluateJsonPath.java | 222 +++--
.../processors/standard/TestEvaluateXPath.java | 105 ++-
.../processors/standard/TestEvaluateXQuery.java | 312 ++++---
.../processors/standard/TestExecuteProcess.java | 36 +-
.../standard/TestExecuteStreamCommand.java | 131 ++-
.../processors/standard/TestExtractText.java | 104 ++-
.../nifi/processors/standard/TestGetFile.java | 63 +-
.../nifi/processors/standard/TestGetHTTP.java | 112 +--
.../processors/standard/TestGetJMSQueue.java | 63 +-
.../standard/TestHandleHttpRequest.java | 49 +-
.../standard/TestHandleHttpResponse.java | 109 ++-
.../processors/standard/TestHashAttribute.java | 6 +-
.../processors/standard/TestHashContent.java | 4 +-
.../standard/TestIdentifyMimeType.java | 15 +-
.../processors/standard/TestInvokeHTTP.java | 94 ++-
.../processors/standard/TestJmsConsumer.java | 224 ++---
.../nifi/processors/standard/TestListenUDP.java | 38 +-
.../processors/standard/TestMergeContent.java | 180 ++--
.../processors/standard/TestModifyBytes.java | 81 +-
.../standard/TestMonitorActivity.java | 78 +-
.../nifi/processors/standard/TestPostHTTP.java | 189 +++--
.../nifi/processors/standard/TestPutEmail.java | 84 +-
.../processors/standard/TestReplaceText.java | 81 +-
.../standard/TestReplaceTextLineByLine.java | 203 +++--
.../standard/TestReplaceTextWithMapping.java | 318 +++++---
.../standard/TestRouteOnAttribute.java | 64 +-
.../processors/standard/TestRouteOnContent.java | 18 +-
.../processors/standard/TestScanAttribute.java | 15 +-
.../processors/standard/TestScanContent.java | 34 +-
.../processors/standard/TestSegmentContent.java | 13 +-
.../nifi/processors/standard/TestServer.java | 84 +-
.../processors/standard/TestSplitContent.java | 213 +++--
.../nifi/processors/standard/TestSplitJson.java | 91 ++-
.../nifi/processors/standard/TestSplitText.java | 70 +-
.../nifi/processors/standard/TestSplitXml.java | 3 +-
.../processors/standard/TestTransformXml.java | 40 +-
.../processors/standard/TestUnpackContent.java | 129 ++-
.../processors/standard/TestValidateXml.java | 3 +-
.../test/resources/TestJson/json-sample.json | 814 +++++++++----------
.../ControllerStatusReportingTask.java | 19 +-
.../apache/nifi/controller/MonitorMemory.java | 3 +-
.../processors/attributes/UpdateAttribute.java | 59 +-
.../update/attributes/api/RuleResource.java | 10 +-
173 files changed, 9831 insertions(+), 7454 deletions(-)
----------------------------------------------------------------------
[02/10] incubator-nifi git commit: Merge branch 'develop' of
http://git-wip-us.apache.org/repos/asf/incubator-nifi into develop
Posted by ma...@apache.org.
Merge branch 'develop' of http://git-wip-us.apache.org/repos/asf/incubator-nifi into develop
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/f442d55c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/f442d55c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/f442d55c
Branch: refs/heads/develop
Commit: f442d55c91d80091c7d2dbf8c5104d6aefcd31c2
Parents: b097a53 5481889
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Apr 27 09:29:19 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Apr 27 09:29:19 2015 -0400
----------------------------------------------------------------------
.../apache/nifi/processors/aws/sns/PutSNS.java | 5 +-
.../apache/nifi/processors/aws/sqs/GetSQS.java | 3 +-
.../org/apache/nifi/processors/GeoEnrichIP.java | 38 +-
.../nifi/processors/maxmind/DatabaseReader.java | 44 +-
.../nifi-hl7-bundle/nifi-hl7-processors/pom.xml | 114 +--
.../processors/hl7/ExtractHL7Attributes.java | 365 +++++----
.../apache/nifi/processors/hl7/RouteHL7.java | 300 +++----
.../hl7/TestExtractHL7Attributes.java | 30 +-
.../apache/nifi/processors/kafka/GetKafka.java | 438 +++++-----
.../apache/nifi/processors/kafka/PutKafka.java | 285 +++----
.../additionalDetails.html | 28 +-
.../additionalDetails.html | 30 +-
.../nifi/processors/kafka/TestGetKafka.java | 58 +-
.../nifi/processors/kafka/TestPutKafka.java | 189 ++---
nifi/nifi-nar-bundles/nifi-kafka-bundle/pom.xml | 14 +-
.../processors/kite/AbstractKiteProcessor.java | 10 +-
.../apache/nifi/processors/kite/AvroUtil.java | 18 +-
.../nifi/processors/kite/ConvertCSVToAvro.java | 348 ++++----
.../nifi/processors/kite/ConvertJSONToAvro.java | 2 +-
.../processors/kite/StoreInKiteDataset.java | 208 ++---
.../processors/kite/TestCSVToAvroProcessor.java | 181 ++---
.../kite/TestConfigurationProperty.java | 65 +-
.../nifi/processors/kite/TestGetSchema.java | 117 ++-
.../kite/TestJSONToAvroProcessor.java | 58 +-
.../kite/TestKiteProcessorsCluster.java | 125 ++-
.../kite/TestKiteStorageProcessor.java | 225 +++--
.../apache/nifi/processors/kite/TestUtil.java | 114 +--
.../nifi-yandex-processors/pom.xml | 4 +-
.../nifi/processors/yandex/YandexTranslate.java | 381 +++++----
.../processors/yandex/model/Translation.java | 55 +-
.../nifi/processors/yandex/util/Languages.java | 124 +--
.../processors/yandex/TestYandexTranslate.java | 317 ++++----
.../nifi-language-translation-bundle/pom.xml | 28 +-
.../nifi-twitter-processors/pom.xml | 10 +-
.../nifi/processors/twitter/GetTwitter.java | 450 +++++-----
.../web/StandardContentViewerController.java | 9 +-
.../standard/AbstractJsonPathProcessor.java | 19 +-
.../standard/Base64EncodeContent.java | 11 +-
.../nifi/processors/standard/BinFiles.java | 170 ++--
.../processors/standard/CompressContent.java | 38 +-
.../nifi/processors/standard/ControlRate.java | 39 +-
.../standard/ConvertCharacterSet.java | 37 +-
.../processors/standard/DetectDuplicate.java | 28 +-
.../processors/standard/DistributeLoad.java | 89 +-
.../nifi/processors/standard/EncodeContent.java | 27 +-
.../processors/standard/EncryptContent.java | 23 +-
.../processors/standard/EvaluateJsonPath.java | 91 ++-
.../standard/EvaluateRegularExpression.java | 31 +-
.../nifi/processors/standard/EvaluateXPath.java | 89 +-
.../processors/standard/EvaluateXQuery.java | 63 +-
.../processors/standard/ExecuteProcess.java | 19 +-
.../standard/ExecuteStreamCommand.java | 71 +-
.../nifi/processors/standard/ExtractText.java | 26 +-
.../processors/standard/GenerateFlowFile.java | 10 +-
.../apache/nifi/processors/standard/GetFTP.java | 14 +-
.../nifi/processors/standard/GetFile.java | 32 +-
.../processors/standard/GetFileTransfer.java | 9 +-
.../nifi/processors/standard/GetHTTP.java | 69 +-
.../nifi/processors/standard/GetJMSTopic.java | 50 +-
.../nifi/processors/standard/GetSFTP.java | 16 +-
.../processors/standard/HandleHttpRequest.java | 468 ++++++-----
.../processors/standard/HandleHttpResponse.java | 127 +--
.../nifi/processors/standard/HashAttribute.java | 38 +-
.../nifi/processors/standard/HashContent.java | 18 +-
.../processors/standard/IdentifyMimeType.java | 17 +-
.../nifi/processors/standard/InvokeHTTP.java | 149 ++--
.../nifi/processors/standard/JmsConsumer.java | 100 +--
.../nifi/processors/standard/ListenHTTP.java | 18 +-
.../nifi/processors/standard/ListenUDP.java | 133 ++-
.../nifi/processors/standard/LogAttribute.java | 5 +-
.../nifi/processors/standard/MergeContent.java | 180 ++--
.../nifi/processors/standard/ModifyBytes.java | 3 -
.../processors/standard/MonitorActivity.java | 27 +-
.../nifi/processors/standard/PostHTTP.java | 152 ++--
.../nifi/processors/standard/PutEmail.java | 423 +++++-----
.../apache/nifi/processors/standard/PutFTP.java | 56 +-
.../nifi/processors/standard/PutFile.java | 264 +++---
.../processors/standard/PutFileTransfer.java | 127 ++-
.../apache/nifi/processors/standard/PutJMS.java | 133 ++-
.../nifi/processors/standard/PutSFTP.java | 9 +-
.../nifi/processors/standard/ReplaceText.java | 163 ++--
.../standard/ReplaceTextWithMapping.java | 215 +++--
.../processors/standard/RouteOnAttribute.java | 111 ++-
.../processors/standard/RouteOnContent.java | 149 ++--
.../nifi/processors/standard/ScanAttribute.java | 115 ++-
.../nifi/processors/standard/ScanContent.java | 83 +-
.../processors/standard/SegmentContent.java | 75 +-
.../nifi/processors/standard/SplitContent.java | 163 ++--
.../nifi/processors/standard/SplitJson.java | 74 +-
.../nifi/processors/standard/SplitText.java | 142 ++--
.../nifi/processors/standard/SplitXml.java | 61 +-
.../nifi/processors/standard/TransformXml.java | 121 +--
.../nifi/processors/standard/UnpackContent.java | 216 +++--
.../nifi/processors/standard/ValidateXml.java | 45 +-
.../servlets/ContentAcknowledgmentServlet.java | 53 +-
.../standard/servlets/ListenHTTPServlet.java | 156 ++--
.../nifi/processors/standard/util/Bin.java | 6 +-
.../processors/standard/util/BinManager.java | 26 +-
.../standard/util/DocumentReaderCallback.java | 3 +-
.../processors/standard/util/FTPTransfer.java | 351 +++++---
.../nifi/processors/standard/util/FTPUtils.java | 38 +-
.../nifi/processors/standard/util/FileInfo.java | 3 +-
.../processors/standard/util/FileTransfer.java | 356 ++++----
.../processors/standard/util/JmsFactory.java | 127 ++-
.../standard/util/JmsProcessingSummary.java | 100 ++-
.../processors/standard/util/JmsProperties.java | 257 +++---
.../util/JsonPathExpressionValidator.java | 107 ++-
.../standard/util/SFTPConnection.java | 9 -
.../processors/standard/util/SFTPTransfer.java | 342 +++++---
.../processors/standard/util/SFTPUtils.java | 179 ++--
.../standard/util/UDPStreamConsumer.java | 25 +-
.../util/ValidatingBase32InputStream.java | 1 -
.../standard/util/WrappedMessageConsumer.java | 9 +-
.../standard/util/WrappedMessageProducer.java | 9 +-
.../standard/util/XmlSplitterSaxParser.java | 11 +-
.../additionalDetails.html | 6 +-
.../additionalDetails.html | 2 +-
.../org/apache/tika/mime/custom-mimetypes.xml | 144 ++--
.../src/test/java/TestIngestAndUpdate.java | 3 +-
.../processors/standard/CaptureServlet.java | 12 +-
.../processors/standard/HelloWorldServlet.java | 3 +-
.../standard/RESTServiceContentModified.java | 15 +-
.../standard/TestBase64EncodeContent.java | 41 +-
.../standard/TestCompressContent.java | 96 ++-
.../processors/standard/TestControlRate.java | 6 +-
.../standard/TestConvertCharacterSet.java | 13 +-
.../standard/TestDetectDuplicate.java | 36 +-
.../processors/standard/TestDistributeLoad.java | 21 +-
.../processors/standard/TestEncodeContent.java | 66 +-
.../processors/standard/TestEncryptContent.java | 29 +-
.../standard/TestEvaluateJsonPath.java | 222 +++--
.../processors/standard/TestEvaluateXPath.java | 105 ++-
.../processors/standard/TestEvaluateXQuery.java | 312 ++++---
.../processors/standard/TestExecuteProcess.java | 36 +-
.../standard/TestExecuteStreamCommand.java | 131 ++-
.../processors/standard/TestExtractText.java | 104 ++-
.../nifi/processors/standard/TestGetFile.java | 63 +-
.../nifi/processors/standard/TestGetHTTP.java | 112 +--
.../processors/standard/TestGetJMSQueue.java | 63 +-
.../standard/TestHandleHttpRequest.java | 49 +-
.../standard/TestHandleHttpResponse.java | 109 ++-
.../processors/standard/TestHashAttribute.java | 6 +-
.../processors/standard/TestHashContent.java | 4 +-
.../standard/TestIdentifyMimeType.java | 15 +-
.../processors/standard/TestInvokeHTTP.java | 94 ++-
.../processors/standard/TestJmsConsumer.java | 224 ++---
.../nifi/processors/standard/TestListenUDP.java | 38 +-
.../processors/standard/TestMergeContent.java | 180 ++--
.../processors/standard/TestModifyBytes.java | 81 +-
.../standard/TestMonitorActivity.java | 78 +-
.../nifi/processors/standard/TestPostHTTP.java | 189 +++--
.../nifi/processors/standard/TestPutEmail.java | 84 +-
.../processors/standard/TestReplaceText.java | 81 +-
.../standard/TestReplaceTextLineByLine.java | 203 +++--
.../standard/TestReplaceTextWithMapping.java | 318 +++++---
.../standard/TestRouteOnAttribute.java | 64 +-
.../processors/standard/TestRouteOnContent.java | 18 +-
.../processors/standard/TestScanAttribute.java | 15 +-
.../processors/standard/TestScanContent.java | 34 +-
.../processors/standard/TestSegmentContent.java | 13 +-
.../nifi/processors/standard/TestServer.java | 84 +-
.../processors/standard/TestSplitContent.java | 213 +++--
.../nifi/processors/standard/TestSplitJson.java | 91 ++-
.../nifi/processors/standard/TestSplitText.java | 70 +-
.../nifi/processors/standard/TestSplitXml.java | 3 +-
.../processors/standard/TestTransformXml.java | 40 +-
.../processors/standard/TestUnpackContent.java | 129 ++-
.../processors/standard/TestValidateXml.java | 3 +-
.../test/resources/TestJson/json-sample.json | 814 +++++++++----------
.../ControllerStatusReportingTask.java | 19 +-
.../apache/nifi/controller/MonitorMemory.java | 3 +-
.../processors/attributes/UpdateAttribute.java | 59 +-
.../update/attributes/api/RuleResource.java | 10 +-
173 files changed, 9831 insertions(+), 7454 deletions(-)
----------------------------------------------------------------------
[03/10] incubator-nifi git commit: NIFI-527: Merging develop
Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
new file mode 100644
index 0000000..6f85b94
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance;
+
+import static org.apache.nifi.provenance.TestUtil.createFlowFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.nifi.provenance.toc.StandardTocReader;
+import org.apache.nifi.provenance.toc.StandardTocWriter;
+import org.apache.nifi.provenance.toc.TocReader;
+import org.apache.nifi.provenance.toc.TocUtil;
+import org.apache.nifi.provenance.toc.TocWriter;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestStandardRecordReaderWriter {
+ @BeforeClass
+ public static void setLogLevel() {
+ System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
+ }
+
+ private ProvenanceEventRecord createEvent() {
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("filename", "1.txt");
+ attributes.put("uuid", UUID.randomUUID().toString());
+
+ final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+ builder.setEventTime(System.currentTimeMillis());
+ builder.setEventType(ProvenanceEventType.RECEIVE);
+ builder.setTransitUri("nifi://unit-test");
+ builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+ builder.setComponentId("1234");
+ builder.setComponentType("dummy processor");
+ final ProvenanceEventRecord record = builder.build();
+
+ return record;
+ }
+
+ @Test
+ public void testSimpleWriteWithToc() throws IOException {
+ final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite");
+ final File tocFile = TocUtil.getTocFile(journalFile);
+ final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
+ final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, false, 1024 * 1024);
+
+ writer.writeHeader();
+ writer.writeRecord(createEvent(), 1L);
+ writer.close();
+
+ final TocReader tocReader = new StandardTocReader(tocFile);
+
+ try (final FileInputStream fis = new FileInputStream(journalFile);
+ final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+ assertEquals(0, reader.getBlockIndex());
+ reader.skipToBlock(0);
+ StandardProvenanceEventRecord recovered = reader.nextRecord();
+ assertNotNull(recovered);
+
+ assertEquals("nifi://unit-test", recovered.getTransitUri());
+ assertNull(reader.nextRecord());
+ }
+
+ FileUtils.deleteFile(journalFile.getParentFile(), true);
+ }
+
+
+ @Test
+ public void testSingleRecordCompressed() throws IOException {
+ final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
+ final File tocFile = TocUtil.getTocFile(journalFile);
+ final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
+ final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 100);
+
+ writer.writeHeader();
+ writer.writeRecord(createEvent(), 1L);
+ writer.close();
+
+ final TocReader tocReader = new StandardTocReader(tocFile);
+
+ try (final FileInputStream fis = new FileInputStream(journalFile);
+ final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+ assertEquals(0, reader.getBlockIndex());
+ reader.skipToBlock(0);
+ StandardProvenanceEventRecord recovered = reader.nextRecord();
+ assertNotNull(recovered);
+
+ assertEquals("nifi://unit-test", recovered.getTransitUri());
+ assertNull(reader.nextRecord());
+ }
+
+ FileUtils.deleteFile(journalFile.getParentFile(), true);
+ }
+
+
+ @Test
+ public void testMultipleRecordsSameBlockCompressed() throws IOException {
+ final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
+ final File tocFile = TocUtil.getTocFile(journalFile);
+ final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
+ // new record each 1 MB of uncompressed data
+ final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 1024 * 1024);
+
+ writer.writeHeader();
+ for (int i=0; i < 10; i++) {
+ writer.writeRecord(createEvent(), i);
+ }
+ writer.close();
+
+ final TocReader tocReader = new StandardTocReader(tocFile);
+
+ try (final FileInputStream fis = new FileInputStream(journalFile);
+ final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+ for (int i=0; i < 10; i++) {
+ assertEquals(0, reader.getBlockIndex());
+
+ // call skipToBlock half the time to ensure that we can; avoid calling it
+ // the other half of the time to ensure that it's okay.
+ if (i <= 5) {
+ reader.skipToBlock(0);
+ }
+
+ StandardProvenanceEventRecord recovered = reader.nextRecord();
+ assertNotNull(recovered);
+ assertEquals("nifi://unit-test", recovered.getTransitUri());
+ }
+
+ assertNull(reader.nextRecord());
+ }
+
+ FileUtils.deleteFile(journalFile.getParentFile(), true);
+ }
+
+
+ @Test
+ public void testMultipleRecordsMultipleBlocksCompressed() throws IOException {
+ final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
+ final File tocFile = TocUtil.getTocFile(journalFile);
+ final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
+ // new block each 10 bytes
+ final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 100);
+
+ writer.writeHeader();
+ for (int i=0; i < 10; i++) {
+ writer.writeRecord(createEvent(), i);
+ }
+ writer.close();
+
+ final TocReader tocReader = new StandardTocReader(tocFile);
+
+ try (final FileInputStream fis = new FileInputStream(journalFile);
+ final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+ for (int i=0; i < 10; i++) {
+ StandardProvenanceEventRecord recovered = reader.nextRecord();
+ System.out.println(recovered);
+ assertNotNull(recovered);
+ assertEquals((long) i, recovered.getEventId());
+ assertEquals("nifi://unit-test", recovered.getTransitUri());
+ }
+
+ assertNull(reader.nextRecord());
+ }
+
+ FileUtils.deleteFile(journalFile.getParentFile(), true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
new file mode 100644
index 0000000..7459fe8
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.flowfile.FlowFile;
+
+public class TestUtil {
+ public static FlowFile createFlowFile(final long id, final long fileSize, final Map<String, String> attributes) {
+ final Map<String, String> attrCopy = new HashMap<>(attributes);
+
+ return new FlowFile() {
+ @Override
+ public long getId() {
+ return id;
+ }
+
+ @Override
+ public long getEntryDate() {
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public Set<String> getLineageIdentifiers() {
+ return new HashSet<String>();
+ }
+
+ @Override
+ public long getLineageStartDate() {
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public Long getLastQueueDate() {
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public boolean isPenalized() {
+ return false;
+ }
+
+ @Override
+ public String getAttribute(final String s) {
+ return attrCopy.get(s);
+ }
+
+ @Override
+ public long getSize() {
+ return fileSize;
+ }
+
+ @Override
+ public Map<String, String> getAttributes() {
+ return attrCopy;
+ }
+
+ @Override
+ public int compareTo(final FlowFile o) {
+ return 0;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java
new file mode 100644
index 0000000..30326e7
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.UUID;
+
+import org.junit.Test;
+
+public class TestStandardTocReader {
+
+ @Test
+ public void testDetectsCompression() throws IOException {
+ final File file = new File("target/" + UUID.randomUUID().toString());
+ try (final OutputStream out = new FileOutputStream(file)) {
+ out.write(0);
+ out.write(0);
+ }
+
+ try {
+ try(final StandardTocReader reader = new StandardTocReader(file)) {
+ assertFalse(reader.isCompressed());
+ }
+ } finally {
+ file.delete();
+ }
+
+
+ try (final OutputStream out = new FileOutputStream(file)) {
+ out.write(0);
+ out.write(1);
+ }
+
+ try {
+ try(final StandardTocReader reader = new StandardTocReader(file)) {
+ assertTrue(reader.isCompressed());
+ }
+ } finally {
+ file.delete();
+ }
+ }
+
+
+ @Test
+ public void testGetBlockIndex() throws IOException {
+ final File file = new File("target/" + UUID.randomUUID().toString());
+ try (final OutputStream out = new FileOutputStream(file);
+ final DataOutputStream dos = new DataOutputStream(out)) {
+ out.write(0);
+ out.write(0);
+
+ for (int i=0; i < 1024; i++) {
+ dos.writeLong(i * 1024L);
+ }
+ }
+
+ try {
+ try(final StandardTocReader reader = new StandardTocReader(file)) {
+ assertFalse(reader.isCompressed());
+
+ for (int i=0; i < 1024; i++) {
+ assertEquals(i * 1024, reader.getBlockOffset(i));
+ }
+ }
+ } finally {
+ file.delete();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5ac48a0/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java
new file mode 100644
index 0000000..70f55a2
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.Test;
+
+public class TestStandardTocWriter {
+ @Test
+ public void testOverwriteEmptyFile() throws IOException {
+ final File tocFile = new File("target/" + UUID.randomUUID().toString() + ".toc");
+ try {
+ assertTrue( tocFile.createNewFile() );
+
+ try (final StandardTocWriter writer = new StandardTocWriter(tocFile, false, false)) {
+ }
+ } finally {
+ FileUtils.deleteFile(tocFile, false);
+ }
+ }
+
+}
[09/10] incubator-nifi git commit: NIFI-549: Fixed NPE
Posted by ma...@apache.org.
NIFI-549: Fixed NPE
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/ba96e43a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/ba96e43a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/ba96e43a
Branch: refs/heads/develop
Commit: ba96e43a8e0fa682e9c803228d292969ffd0c686
Parents: 0759660
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Apr 27 12:01:36 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Apr 27 12:01:36 2015 -0400
----------------------------------------------------------------------
.../java/org/apache/nifi/processors/GeoEnrichIP.java | 13 +++++++++++--
1 file changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ba96e43a/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java
index be03243..1ecb221 100644
--- a/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java
+++ b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java
@@ -189,8 +189,17 @@ public class GeoEnrichIP extends AbstractProcessor {
final Map<String, String> attrs = new HashMap<>();
attrs.put(new StringBuilder(ipAttributeName).append(".geo.lookup.micros").toString(), String.valueOf(stopWatch.getDuration(TimeUnit.MICROSECONDS)));
attrs.put(new StringBuilder(ipAttributeName).append(".geo.city").toString(), response.getCity().getName());
- attrs.put(new StringBuilder(ipAttributeName).append(".geo.latitude").toString(), response.getLocation().getLatitude().toString());
- attrs.put(new StringBuilder(ipAttributeName).append(".geo.longitude").toString(), response.getLocation().getLongitude().toString());
+
+ final Double latitude = response.getLocation().getLatitude();
+ if ( latitude != null ) {
+ attrs.put(new StringBuilder(ipAttributeName).append(".geo.latitude").toString(), latitude.toString());
+ }
+
+ final Double longitude = response.getLocation().getLongitude();
+ if ( longitude != null ) {
+ attrs.put(new StringBuilder(ipAttributeName).append(".geo.longitude").toString(), longitude.toString());
+ }
+
int i = 0;
for (final Subdivision subd : response.getSubdivisions()) {
attrs.put(new StringBuilder(ipAttributeName).append(".geo.subdivision.").append(i).toString(), subd.getName());
[10/10] incubator-nifi git commit: Merge branch 'develop' of
http://git-wip-us.apache.org/repos/asf/incubator-nifi into develop
Posted by ma...@apache.org.
Merge branch 'develop' of http://git-wip-us.apache.org/repos/asf/incubator-nifi into develop
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/666de3d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/666de3d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/666de3d4
Branch: refs/heads/develop
Commit: 666de3d410a8f80a8ad4a90e5fbcce08c777103e
Parents: ba96e43 d29a2d6
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Apr 27 12:04:35 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Apr 27 12:04:35 2015 -0400
----------------------------------------------------------------------
.../standard/AbstractJsonPathProcessor.java | 11 +-
.../standard/ConvertCharacterSet.java | 2 +-
.../nifi/processors/standard/HashAttribute.java | 34 +-
.../nifi/processors/standard/PutEmail.java | 302 +++++++---------
.../apache/nifi/processors/standard/PutFTP.java | 24 +-
.../nifi/processors/standard/PutFile.java | 275 ++++++--------
.../processors/standard/PutFileTransfer.java | 140 +++-----
.../apache/nifi/processors/standard/PutJMS.java | 161 +++------
.../nifi/processors/standard/ReplaceText.java | 169 ++++-----
.../standard/ReplaceTextWithMapping.java | 231 +++++-------
.../processors/standard/RouteOnAttribute.java | 136 +++----
.../processors/standard/RouteOnContent.java | 147 +++-----
.../nifi/processors/standard/ScanAttribute.java | 121 +++----
.../nifi/processors/standard/ScanContent.java | 86 ++---
.../processors/standard/SegmentContent.java | 54 ++-
.../nifi/processors/standard/SplitContent.java | 125 +++----
.../nifi/processors/standard/SplitJson.java | 77 ++--
.../nifi/processors/standard/SplitText.java | 142 ++++----
.../nifi/processors/standard/SplitXml.java | 70 ++--
.../nifi/processors/standard/TransformXml.java | 87 ++---
.../nifi/processors/standard/UnpackContent.java | 215 +++++------
.../servlets/ContentAcknowledgmentServlet.java | 55 +--
.../standard/servlets/ListenHTTPServlet.java | 142 +++-----
.../nifi/processors/standard/util/Bin.java | 22 +-
.../processors/standard/util/BinManager.java | 41 +--
.../standard/util/DocumentReaderCallback.java | 6 +-
.../processors/standard/util/FTPTransfer.java | 351 +++++++-----------
.../nifi/processors/standard/util/FTPUtils.java | 95 ++---
.../nifi/processors/standard/util/FileInfo.java | 3 +-
.../processors/standard/util/FileTransfer.java | 356 +++++++++----------
.../processors/standard/util/JmsFactory.java | 128 +++----
.../processors/standard/util/JmsProperties.java | 256 ++++++-------
.../util/JsonPathExpressionValidator.java | 27 +-
.../standard/util/NLKBufferedReader.java | 14 +-
.../processors/standard/util/SFTPTransfer.java | 351 +++++++-----------
.../processors/standard/util/SFTPUtils.java | 167 ++++-----
.../standard/util/UDPStreamConsumer.java | 25 +-
.../util/ValidatingBase32InputStream.java | 3 +-
.../util/ValidatingBase64InputStream.java | 3 +-
.../standard/util/WrappedMessageConsumer.java | 9 +-
.../standard/util/WrappedMessageProducer.java | 9 +-
.../src/test/java/TestIngestAndUpdate.java | 3 +-
.../processors/standard/CaptureServlet.java | 3 +-
.../standard/RESTServiceContentModified.java | 15 +-
.../standard/TestBase64EncodeContent.java | 42 +--
.../standard/TestCompressContent.java | 85 ++---
.../processors/standard/TestControlRate.java | 3 +-
.../standard/TestConvertCharacterSet.java | 13 +-
.../standard/TestDetectDuplicate.java | 33 +-
.../processors/standard/TestDistributeLoad.java | 19 +-
.../processors/standard/TestEncodeContent.java | 66 ++--
.../processors/standard/TestEncryptContent.java | 30 +-
.../standard/TestEvaluateJsonPath.java | 219 ++++--------
.../processors/standard/TestEvaluateXPath.java | 106 ++----
.../processors/standard/TestEvaluateXQuery.java | 312 +++++-----------
.../processors/standard/TestExecuteProcess.java | 18 +-
.../standard/TestExecuteStreamCommand.java | 135 +++----
.../processors/standard/TestExtractText.java | 81 ++---
.../nifi/processors/standard/TestGetFile.java | 63 ++--
.../nifi/processors/standard/TestGetHTTP.java | 57 +--
.../processors/standard/TestGetJMSQueue.java | 63 ++--
.../standard/TestHandleHttpRequest.java | 19 +-
.../standard/TestHandleHttpResponse.java | 81 ++---
.../processors/standard/TestHashAttribute.java | 8 +-
.../processors/standard/TestHashContent.java | 5 +-
.../standard/TestIdentifyMimeType.java | 16 +-
.../processors/standard/TestInvokeHTTP.java | 137 ++-----
.../processors/standard/TestJmsConsumer.java | 88 ++---
.../nifi/processors/standard/TestListenUDP.java | 39 +-
.../processors/standard/TestMergeContent.java | 176 +++------
.../processors/standard/TestModifyBytes.java | 82 ++---
.../standard/TestMonitorActivity.java | 84 ++---
.../nifi/processors/standard/TestPostHTTP.java | 102 ++----
.../nifi/processors/standard/TestPutEmail.java | 17 +-
.../processors/standard/TestReplaceText.java | 81 ++---
.../standard/TestReplaceTextLineByLine.java | 204 ++++-------
.../standard/TestReplaceTextWithMapping.java | 316 +++++-----------
.../standard/TestRouteOnAttribute.java | 65 ++--
.../processors/standard/TestRouteOnContent.java | 19 +-
.../processors/standard/TestScanAttribute.java | 16 +-
.../processors/standard/TestScanContent.java | 34 +-
.../processors/standard/TestSegmentContent.java | 14 +-
.../nifi/processors/standard/TestServer.java | 21 +-
.../processors/standard/TestSplitContent.java | 196 ++++------
.../nifi/processors/standard/TestSplitJson.java | 91 ++---
.../nifi/processors/standard/TestSplitText.java | 80 ++---
.../nifi/processors/standard/TestSplitXml.java | 4 +-
.../processors/standard/TestTransformXml.java | 45 +--
.../processors/standard/TestUnpackContent.java | 131 +++----
.../processors/standard/TestValidateXml.java | 4 +-
.../standard/UserAgentTestingServlet.java | 1 -
91 files changed, 2933 insertions(+), 5281 deletions(-)
----------------------------------------------------------------------
[08/10] incubator-nifi git commit: Merge branch 'NIFI-527' into
develop
Posted by ma...@apache.org.
Merge branch 'NIFI-527' into develop
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/07596608
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/07596608
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/07596608
Branch: refs/heads/develop
Commit: 0759660897a8ca9e91d02157ec434abd5e2a7d7e
Parents: c10faca 7c41225
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Apr 27 10:47:38 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Apr 27 10:47:38 2015 -0400
----------------------------------------------------------------------
.../nifi/stream/io/ByteCountingInputStream.java | 5 +
.../stream/io/ByteCountingOutputStream.java | 8 +
.../nifi-framework/nifi-framework-core/pom.xml | 1 +
.../PersistentProvenanceRepository.java | 592 ++++++++++---------
.../provenance/RepositoryConfiguration.java | 14 +-
.../nifi/provenance/StandardRecordReader.java | 223 ++++++-
.../nifi/provenance/StandardRecordWriter.java | 114 +++-
.../provenance/lucene/DeleteIndexAction.java | 75 +--
.../nifi/provenance/lucene/DocsReader.java | 100 +++-
.../nifi/provenance/lucene/FieldNames.java | 1 +
.../nifi/provenance/lucene/IndexManager.java | 467 +++++++++++++++
.../nifi/provenance/lucene/IndexSearch.java | 71 ++-
.../nifi/provenance/lucene/IndexingAction.java | 183 +++---
.../nifi/provenance/lucene/LuceneUtil.java | 26 +-
.../provenance/serialization/RecordReader.java | 67 +++
.../provenance/serialization/RecordReaders.java | 139 ++---
.../provenance/serialization/RecordWriter.java | 6 +
.../provenance/serialization/RecordWriters.java | 13 +-
.../nifi/provenance/toc/StandardTocReader.java | 108 ++++
.../nifi/provenance/toc/StandardTocWriter.java | 120 ++++
.../apache/nifi/provenance/toc/TocReader.java | 58 ++
.../org/apache/nifi/provenance/toc/TocUtil.java | 37 ++
.../apache/nifi/provenance/toc/TocWriter.java | 52 ++
.../TestPersistentProvenanceRepository.java | 192 +++---
.../TestStandardRecordReaderWriter.java | 189 ++++++
.../org/apache/nifi/provenance/TestUtil.java | 82 +++
.../provenance/toc/TestStandardTocReader.java | 91 +++
.../provenance/toc/TestStandardTocWriter.java | 42 ++
.../nifi-standard-processors/pom.xml | 3 +
29 files changed, 2460 insertions(+), 619 deletions(-)
----------------------------------------------------------------------