You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/27 21:47:35 UTC
[08/12] incubator-nifi git commit: NIFI-388: Initial implementation
of prov repo; not yet finished but pushing so that the code is not lost
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/StandardQueryManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/StandardQueryManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/StandardQueryManager.java
index 4cce231..0753e9e 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/StandardQueryManager.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/StandardQueryManager.java
@@ -19,28 +19,39 @@ package org.apache.nifi.provenance.journaling.query;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.nifi.provenance.AsyncLineageSubmission;
import org.apache.nifi.provenance.AsyncQuerySubmission;
import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.StoredProvenanceEvent;
import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent;
import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
import org.apache.nifi.provenance.journaling.index.EventIndexSearcher;
+import org.apache.nifi.provenance.journaling.index.IndexAction;
+import org.apache.nifi.provenance.journaling.index.IndexManager;
import org.apache.nifi.provenance.journaling.index.QueryUtils;
import org.apache.nifi.provenance.journaling.index.SearchResult;
+import org.apache.nifi.provenance.journaling.index.VoidIndexAction;
import org.apache.nifi.provenance.journaling.journals.JournalReader;
import org.apache.nifi.provenance.journaling.journals.StandardJournalReader;
-import org.apache.nifi.provenance.journaling.partition.Partition;
-import org.apache.nifi.provenance.journaling.partition.PartitionManager;
-import org.apache.nifi.provenance.journaling.partition.VoidPartitionAction;
import org.apache.nifi.provenance.journaling.toc.StandardTocReader;
import org.apache.nifi.provenance.journaling.toc.TocReader;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.apache.nifi.provenance.lineage.LineageComputationType;
import org.apache.nifi.provenance.search.Query;
import org.apache.nifi.provenance.search.QuerySubmission;
import org.slf4j.Logger;
@@ -50,14 +61,17 @@ public class StandardQueryManager implements QueryManager {
private static final Logger logger = LoggerFactory.getLogger(StandardQueryManager.class);
private final int maxConcurrentQueries;
+ private final IndexManager indexManager;
+ private final ExecutorService executor;
private final JournalingRepositoryConfig config;
- private final PartitionManager partitionManager;
private final ConcurrentMap<String, AsyncQuerySubmission> querySubmissionMap = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, AsyncLineageSubmission> lineageSubmissionMap = new ConcurrentHashMap<>();
- public StandardQueryManager(final PartitionManager partitionManager, final JournalingRepositoryConfig config, final int maxConcurrentQueries) {
+ public StandardQueryManager(final IndexManager indexManager, final ExecutorService executor, final JournalingRepositoryConfig config, final int maxConcurrentQueries) {
this.config = config;
this.maxConcurrentQueries = maxConcurrentQueries;
- this.partitionManager = partitionManager;
+ this.indexManager = indexManager;
+ this.executor = executor;
}
@Override
@@ -74,12 +88,66 @@ public class StandardQueryManager implements QueryManager {
if (query.getSearchTerms().isEmpty() && query.getStartDate() == null && query.getEndDate() == null) {
final AsyncQuerySubmission result = new AsyncQuerySubmission(query, 1);
+ // empty query. Just get the latest events.
+ final Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ logger.debug("Fetching latest events from Provenance repo");
+ final long indexStartNanos = System.nanoTime();
+
+ // Query each index for the latest events.
+ final Set<List<JournaledStorageLocation>> locationSet = indexManager.withEachIndex(new IndexAction<List<JournaledStorageLocation>>() {
+ @Override
+ public List<JournaledStorageLocation> perform(final EventIndexSearcher searcher) throws IOException {
+ return searcher.getLatestEvents(query.getMaxResults());
+ }
+ });
+ final long indexMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - indexStartNanos);
+ final long retrievalStartNanos = System.nanoTime();
+
+ final List<JournaledStorageLocation> orderedLocations = new ArrayList<>();
+ for ( final List<JournaledStorageLocation> locations : locationSet ) {
+ orderedLocations.addAll(locations);
+ }
+
+ Collections.sort(orderedLocations, new Comparator<JournaledStorageLocation>() {
+ @Override
+ public int compare(final JournaledStorageLocation o1, final JournaledStorageLocation o2) {
+ return Long.compare(o1.getEventId(), o2.getEventId());
+ }
+ });
+
+ final List<JournaledStorageLocation> locationsToKeep;
+ if ( orderedLocations.size() > query.getMaxResults() ) {
+ locationsToKeep = orderedLocations.subList(0, query.getMaxResults());
+ } else {
+ locationsToKeep = orderedLocations;
+ }
+
+ final List<StoredProvenanceEvent> matchingRecords = getEvents(locationsToKeep, new AtomicInteger(locationsToKeep.size()));
+
+ final long totalNumEvents = indexManager.getNumberOfEvents();
+ final long retrievalMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - retrievalStartNanos);
+ logger.debug("Updated query result with {} matching records; total number of events = {}; index search took {} millis, event retrieval took {} millis", matchingRecords.size(), totalNumEvents, indexMillis, retrievalMillis);
+ result.getResult().update(matchingRecords, totalNumEvents);
+ } catch (final Exception e) {
+ result.getResult().setError("Failed to obtain latest events in repository due to " + e);
+ logger.error("Failed to obtain latest events in repository due to {}", e.toString());
+ if ( logger.isDebugEnabled() ) {
+ logger.error("", e);
+ }
+ }
+ }
+ };
+
+ executor.submit(runnable);
querySubmissionMap.put(query.getIdentifier(), result);
return result;
}
final AtomicInteger retrievalCount = new AtomicInteger(query.getMaxResults());
- final AsyncQuerySubmission submission = new AsyncQuerySubmission(query, config.getPartitionCount()) {
+ final AsyncQuerySubmission submission = new AsyncQuerySubmission(query, indexManager.getNumberOfIndices()) {
@Override
public void cancel() {
super.cancel();
@@ -89,56 +157,252 @@ public class StandardQueryManager implements QueryManager {
querySubmissionMap.put(query.getIdentifier(), submission);
- partitionManager.withEachPartition(new VoidPartitionAction() {
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public void perform(final Partition partition) throws IOException {
- logger.debug("Running {} against {}", query, partition);
-
- try (final EventIndexSearcher searcher = partition.newIndexSearcher()) {
- final SearchResult searchResult = searcher.search(query);
- logger.debug("{} has {} hits against {} over {} files", query, searchResult.getTotalCount(), partition, searchResult.getLocations().size());
-
- final List<StoredProvenanceEvent> matchingRecords = new ArrayList<>();
- final Map<File, List<JournaledStorageLocation>> locationMap = QueryUtils.orderLocations((List) searchResult.getLocations(), config);
- for ( final Map.Entry<File, List<JournaledStorageLocation>> entry : locationMap.entrySet() ) {
- final File journalFile = entry.getKey();
- final List<JournaledStorageLocation> locations = entry.getValue();
+ try {
+ indexManager.withEachIndex(new VoidIndexAction() {
+ @Override
+ public void perform(final EventIndexSearcher searcher) throws IOException {
+ try {
+ logger.debug("Running {} against {}", query, searcher);
- if ( retrievalCount.get() <= 0 ) {
- break;
- }
+ final long indexStart = System.nanoTime();
+ final SearchResult searchResult = searcher.search(query);
+ final long indexMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - indexStart);
+ logger.debug("{} has {} hits against {} over {} files", query, searchResult.getTotalCount(), searcher, searchResult.getLocations().size());
- try (final JournalReader reader = new StandardJournalReader(journalFile);
- final TocReader tocReader = new StandardTocReader(QueryUtils.getTocFile(journalFile))) {
-
- for ( final JournaledStorageLocation location : locations ) {
- final long blockOffset = tocReader.getBlockOffset(location.getBlockIndex());
- final ProvenanceEventRecord event = reader.getEvent(blockOffset, location.getEventId());
- matchingRecords.add(new JournaledProvenanceEvent(event, location));
-
- final int recordsLeft = retrievalCount.decrementAndGet();
- if ( recordsLeft <= 0 ) {
- break;
- }
- }
- }
+ final long retrievalStart = System.nanoTime();
+ final List<StoredProvenanceEvent> matchingRecords = getEvents(searchResult.getLocations(), retrievalCount);
+ final long retrievalMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - retrievalStart);
+
+ logger.debug("Finished executing {} against {}; found {} total matches, retrieved {} of them; index search took {} millis, record retrieval took {} millis",
+ query, searcher, searchResult.getTotalCount(), matchingRecords.size(), indexMillis, retrievalMillis);
+ submission.getResult().update(matchingRecords, searchResult.getTotalCount());
+ } catch (final Throwable t) {
+ submission.getResult().setError("Failed to execute query " + query + " against " + searcher + " due to " + t);
+ throw t;
}
-
- logger.debug("Finished executing {} against {}", query, partition);
- submission.getResult().update(matchingRecords, searchResult.getTotalCount());
- } catch (final Exception e) {
- submission.getResult().setError("Failed to query " + partition + " due to " + e.toString());
- throw e;
}
+ }, true);
+ } catch (final IOException ioe) {
+ // only set the error here if it's not already set because we have the least amount of information here
+ if ( submission.getResult().getError() == null ) {
+ submission.getResult().setError("Failed to execute query " + query + " due to " + ioe);
}
- }, true);
+ }
return submission;
}
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private List<StoredProvenanceEvent> getEvents(final List<JournaledStorageLocation> allLocations, final AtomicInteger retrievalCount) throws IOException {
+ final List<StoredProvenanceEvent> matchingRecords = new ArrayList<>();
+ final Map<File, List<JournaledStorageLocation>> locationMap = QueryUtils.orderLocations((List) allLocations, config);
+ for ( final Map.Entry<File, List<JournaledStorageLocation>> entry : locationMap.entrySet() ) {
+ final File journalFile = entry.getKey();
+ final List<JournaledStorageLocation> locations = entry.getValue();
+
+ if ( retrievalCount.get() <= 0 ) {
+ break;
+ }
+
+ try (final JournalReader reader = new StandardJournalReader(journalFile);
+ final TocReader tocReader = new StandardTocReader(QueryUtils.getTocFile(journalFile))) {
+
+ for ( final JournaledStorageLocation location : locations ) {
+ final long blockOffset = tocReader.getBlockOffset(location.getBlockIndex());
+ final ProvenanceEventRecord event = reader.getEvent(blockOffset, location.getEventId());
+ matchingRecords.add(new JournaledProvenanceEvent(event, location));
+
+ final int recordsLeft = retrievalCount.decrementAndGet();
+ if ( recordsLeft <= 0 ) {
+ break;
+ }
+ }
+ }
+ }
+
+ return matchingRecords;
+ }
+
@Override
public QuerySubmission retrieveQuerySubmission(final String queryIdentifier) {
return querySubmissionMap.get(queryIdentifier);
}
+
+ @Override
+ public ComputeLineageSubmission retrieveLineageSubmission(final String lineageIdentifier) {
+ return lineageSubmissionMap.get(lineageIdentifier);
+ }
+
+ @Override
+ public ComputeLineageSubmission submitLineageComputation(final String flowFileUuid) {
+ return submitLineageComputation(Collections.singleton(flowFileUuid), LineageComputationType.FLOWFILE_LINEAGE, null, 0L, Long.MAX_VALUE);
+ }
+
+ private AsyncLineageSubmission submitLineageComputation(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId, final long startTimestamp, final long endTimestamp) {
+ final AsyncLineageSubmission lineageSubmission = new AsyncLineageSubmission(computationType, eventId, flowFileUuids, indexManager.getNumberOfIndices());
+
+ final AtomicInteger retrievalCount = new AtomicInteger(2000);
+ final Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ indexManager.withEachIndex(new VoidIndexAction() {
+ @Override
+ public void perform(EventIndexSearcher searcher) throws IOException {
+ logger.debug("Obtaining lineage events for FlowFile UUIDs {} for {}", flowFileUuids, searcher);
+ final long startNanos = System.nanoTime();
+
+ final List<JournaledStorageLocation> locations = searcher.getEventsForFlowFiles(flowFileUuids, startTimestamp, endTimestamp);
+ final List<StoredProvenanceEvent> matchingRecords = getEvents(locations, retrievalCount);
+ lineageSubmission.getResult().update(matchingRecords);
+
+ final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ logger.debug("Finished querying for lineage events; found {} events in {} millis", matchingRecords.size(), millis);
+ }
+ });
+ } catch (final IOException ioe) {
+ lineageSubmission.getResult().setError("Failed to calculate FlowFile Lineage due to " + ioe);
+ logger.error("Failed to calculate FlowFile Lineage due to {}", ioe.toString());
+ if ( logger.isDebugEnabled() ) {
+ logger.error("", ioe);
+ }
+ }
+ }
+ };
+
+ executor.submit(runnable);
+ lineageSubmissionMap.putIfAbsent(lineageSubmission.getLineageIdentifier(), lineageSubmission);
+ return lineageSubmission;
+ }
+
+
+ @Override
+ public ComputeLineageSubmission submitExpandChildren(final ProvenanceEventRepository eventRepo, final long eventId) {
+ final Set<String> flowFileUuids = Collections.synchronizedSet(new HashSet<String>());
+ final AsyncLineageSubmission lineageSubmission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, flowFileUuids, indexManager.getNumberOfIndices());
+
+ final Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ logger.debug("Obtaining event with id {} in order to expand children", eventId);
+ final StoredProvenanceEvent event = eventRepo.getEvent(eventId);
+ if ( event == null ) {
+ lineageSubmission.getResult().setError("Cannot expand children of event with ID " + eventId + " because that event cannot be found");
+ logger.warn("Cannot expand children of event with ID {} because that event cannot be found", eventId);
+ return;
+ }
+
+ logger.debug("Found event with id {}; searching for children", eventId);
+
+ switch (event.getEventType()) {
+ case CLONE:
+ case FORK:
+ case JOIN:
+ case REPLAY:
+ break;
+ default:
+ logger.warn("Cannot expand children of event with ID {} because event type is {}", eventId, event.getEventType());
+ lineageSubmission.getResult().setError("Cannot expand children of event with ID " + eventId +
+ " because that event is of type " + event.getEventType() +
+ ", and that type does not support expansion of children");
+ return;
+ }
+
+ final List<String> childUuids = event.getChildUuids();
+ flowFileUuids.addAll(childUuids);
+
+ final AtomicInteger retrievalCount = new AtomicInteger(100);
+ indexManager.withEachIndex(new VoidIndexAction() {
+ @Override
+ public void perform(EventIndexSearcher searcher) throws IOException {
+ final long startNanos = System.nanoTime();
+ logger.debug("Finding children of event with id {} using {}", eventId, searcher);
+
+ final List<JournaledStorageLocation> locations = searcher.getEventsForFlowFiles(flowFileUuids, event.getEventTime(), Long.MAX_VALUE);
+ final List<StoredProvenanceEvent> matchingRecords = getEvents(locations, retrievalCount);
+ lineageSubmission.getResult().update(matchingRecords);
+
+ final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ logger.debug("Found {} children of event {} in {} millis", matchingRecords.size(), eventId, millis);
+ }
+ });
+ } catch (final IOException ioe) {
+
+ }
+ }
+ };
+
+ executor.submit(runnable);
+ lineageSubmissionMap.putIfAbsent(lineageSubmission.getLineageIdentifier(), lineageSubmission);
+ return lineageSubmission;
+ }
+
+ @Override
+ public ComputeLineageSubmission submitExpandParents(final ProvenanceEventRepository eventRepo, final long eventId) {
+ final Set<String> flowFileUuids = Collections.synchronizedSet(new HashSet<String>());
+ final AsyncLineageSubmission lineageSubmission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, flowFileUuids, indexManager.getNumberOfIndices());
+
+ final Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ logger.debug("Obtaining event with id {} in order to expand children", eventId);
+ final StoredProvenanceEvent event = eventRepo.getEvent(eventId);
+ if ( event == null ) {
+ logger.warn("Cannot expand children of event with ID {} because that event cannot be found", eventId);
+ lineageSubmission.getResult().setError("Cannot expand children of event with ID " + eventId + " because that event cannot be found");
+ return;
+ }
+
+ logger.debug("Found event with id {}; searching for children", eventId);
+
+ switch (event.getEventType()) {
+ case CLONE:
+ case FORK:
+ case JOIN:
+ case REPLAY:
+ break;
+ default:
+ logger.warn("Cannot expand parents of event with ID {} because event type is {}", eventId, event.getEventType());
+ lineageSubmission.getResult().setError("Cannot expand parents of event with ID " + eventId +
+ " because that event is of type " + event.getEventType() +
+ ", and that type does not support expansion of children");
+ return;
+ }
+
+ final List<String> parentUuids = event.getParentUuids();
+ flowFileUuids.addAll(parentUuids);
+
+ final AtomicInteger retrievalCount = new AtomicInteger(100);
+ indexManager.withEachIndex(new VoidIndexAction() {
+ @Override
+ public void perform(EventIndexSearcher searcher) throws IOException {
+ final long startNanos = System.nanoTime();
+ logger.debug("Finding parents of event with id {} using {}", eventId, searcher);
+
+ final List<JournaledStorageLocation> locations = searcher.getEventsForFlowFiles(flowFileUuids, event.getLineageStartDate(), event.getEventTime());
+ final List<StoredProvenanceEvent> matchingRecords = getEvents(locations, retrievalCount);
+ lineageSubmission.getResult().update(matchingRecords);
+
+ final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ logger.debug("Found {} parents of event {} in {} millis", matchingRecords.size(), eventId, millis);
+ }
+ });
+ } catch (final IOException ioe) {
+
+ }
+ }
+ };
+
+ executor.submit(runnable);
+ lineageSubmissionMap.putIfAbsent(lineageSubmission.getLineageIdentifier(), lineageSubmission);
+ return lineageSubmission;
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
index a6a487b..fc9fb46 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
@@ -18,8 +18,11 @@ package org.apache.nifi.provenance.journaling.tasks;
import java.io.EOFException;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collections;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.journaling.io.StandardEventSerializer;
@@ -34,7 +37,10 @@ import org.apache.nifi.provenance.journaling.toc.TocWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class CompressionTask implements Runnable {
+/**
+ * Compresses a journal file and returns the new size of the journal
+ */
+public class CompressionTask implements Callable<Long> {
public static final String FILE_EXTENSION = ".compress";
private static final Logger logger = LoggerFactory.getLogger(CompressionTask.class);
@@ -56,7 +62,10 @@ public class CompressionTask implements Runnable {
long blockOffset = tocReader.getBlockOffset(blockIndex);
tocWriter.addBlockOffset(blockOffset);
long nextBlockOffset = tocReader.getBlockOffset(blockIndex + 1);
-
+
+ // we write the events one at a time here so that we can ensure that when the block
+ // changes we are able to insert a new block into the TOC, as the blocks have to contain
+ // the same number of events, since the index just knows about the block index.
try {
while ((event = reader.nextEvent()) != null) {
// Check if we've gone beyond the offset of the next block. If so, write
@@ -97,7 +106,7 @@ public class CompressionTask implements Runnable {
}
}
- return false;
+ return !file.exists();
}
/**
@@ -125,11 +134,24 @@ public class CompressionTask implements Runnable {
}
@Override
- public void run() {
+ public Long call() {
+ final long startNanos = System.nanoTime();
+ final long preCompressionSize = journalFile.length();
+
try {
final File compressedFile = new File(journalFile.getParentFile(), journalFile.getName() + FILE_EXTENSION);
final File compressedTocFile = new File(tocFile.getParentFile(), tocFile.getName() + FILE_EXTENSION);
+ if ( compressedFile.exists() && !compressedFile.delete() ) {
+ logger.error("Compressed file {} already exists and could not remove it; compression task failed", compressedFile);
+ return preCompressionSize;
+ }
+
+ if ( compressedTocFile.exists() && !compressedTocFile.delete() ) {
+ logger.error("Compressed TOC file {} already exists and could not remove it; compression task failed", compressedTocFile);
+ return preCompressionSize;
+ }
+
try (final JournalReader journalReader = new StandardJournalReader(journalFile);
final JournalWriter compressedWriter = new StandardJournalWriter(journalId, compressedFile, true, new StandardEventSerializer());
final TocReader tocReader = new StandardTocReader(tocFile);
@@ -137,14 +159,19 @@ public class CompressionTask implements Runnable {
compress(journalReader, compressedWriter, tocReader, compressedTocWriter);
compressedWriter.sync();
+ } catch (final FileNotFoundException fnfe) {
+ logger.info("Failed to compress Journal File {} because it has already been removed", journalFile);
+ return 0L;
}
-
+
+ final long postCompressionSize = compressedFile.length();
+
final boolean deletedJournal = delete(journalFile);
if ( !deletedJournal ) {
delete(compressedFile);
delete(compressedTocFile);
logger.error("Failed to remove Journal file {}; considering compression task a failure", journalFile);
- return;
+ return preCompressionSize;
}
final boolean deletedToc = delete(tocFile);
@@ -152,7 +179,7 @@ public class CompressionTask implements Runnable {
delete(compressedFile);
delete(compressedTocFile);
logger.error("Failed to remove TOC file for {}; considering compression task a failure", journalFile);
- return;
+ return preCompressionSize;
}
final boolean renamedJournal = rename(compressedFile, journalFile);
@@ -165,12 +192,18 @@ public class CompressionTask implements Runnable {
logger.error("Failed to rename {} to {}; this journal file may be inaccessible until it is renamed", compressedTocFile, tocFile);
}
- logger.info("Successfully compressed Journal File {}");
+ final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ final double percent = (postCompressionSize / preCompressionSize) * 100D;
+ final String pct = String.format("%.2f", percent);
+ logger.info("Successfully compressed Journal File {} in {} millis; size changed from {} bytes to {} bytes ({}% of original size)", journalFile, millis, preCompressionSize, postCompressionSize, pct);
+ return postCompressionSize;
} catch (final IOException ioe) {
logger.error("Failed to compress Journal File {} due to {}", journalFile, ioe.toString());
if ( logger.isDebugEnabled() ) {
logger.error("", ioe);
}
+
+ return preCompressionSize;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocReader.java
index 995acf9..ae4635f 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocReader.java
@@ -83,6 +83,14 @@ public class StandardTocReader implements TocReader {
}
@Override
+ public long getLastBlockOffset() {
+ if ( offsets.length == 0 ) {
+ return 0L;
+ }
+ return offsets[offsets.length - 1];
+ }
+
+ @Override
public void close() throws IOException {
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java
index fea6057..9ee07e0 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java
@@ -16,12 +16,17 @@
*/
package org.apache.nifi.provenance.journaling.toc;
+import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.EOFException;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.file.FileAlreadyExistsException;
/**
@@ -51,7 +56,24 @@ public class StandardTocWriter implements TocWriter {
*/
public StandardTocWriter(final File file, final boolean compressionFlag, final boolean alwaysSync) throws IOException {
if ( file.exists() ) {
- throw new FileAlreadyExistsException(file.getAbsolutePath());
+ // Check if the header actually exists. If so, throw FileAlreadyExistsException
+ // If no data is in the file, we will just overwrite it.
+ try (final InputStream fis = new FileInputStream(file);
+ final InputStream bis = new BufferedInputStream(fis);
+ final DataInputStream dis = new DataInputStream(bis)) {
+ dis.read();
+ dis.read();
+
+ // we always add the first offset when the writer is created so we allow this to exist.
+ dis.readLong();
+ final int nextByte = dis.read();
+
+ if ( nextByte > -1 ) {
+ throw new FileAlreadyExistsException(file.getAbsolutePath());
+ }
+ } catch (final EOFException eof) {
+ // no real data. overwrite file.
+ }
}
if ( !file.getParentFile().exists() && !file.getParentFile().mkdirs() ) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocJournalReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocJournalReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocJournalReader.java
index eca664e..c3f9df5 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocJournalReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocJournalReader.java
@@ -33,13 +33,13 @@ public class TocJournalReader implements Closeable {
private final String containerName;
private final String sectionName;
- private final String journalId;
+ private final Long journalId;
private int blockIndex;
private long nextBlockOffset;
- public TocJournalReader(final String containerName, final String sectionName, final String journalId, final File journalFile) throws IOException {
+ public TocJournalReader(final String containerName, final String sectionName, final Long journalId, final File journalFile) throws IOException {
this.containerName = containerName;
this.sectionName = sectionName;
this.journalId = journalId;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocReader.java
index 9f6a264..18d7189 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/TocReader.java
@@ -43,4 +43,9 @@ public interface TocReader extends Closeable {
*/
long getBlockOffset(int blockIndex);
+ /**
+ * Returns the byte offset into the Journal File of the last Block in the given index
+ * @return
+ */
+ long getLastBlockOffset();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java
index a547a8a..30e100a 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java
@@ -23,14 +23,23 @@ import static org.junit.Assert.assertNull;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.SearchableFields;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.provenance.StoredProvenanceEvent;
import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.apache.nifi.provenance.lineage.LineageNode;
+import org.apache.nifi.provenance.lineage.LineageNodeType;
+import org.apache.nifi.provenance.lineage.ProvenanceEventLineageNode;
import org.apache.nifi.provenance.search.Query;
import org.apache.nifi.provenance.search.QueryResult;
import org.apache.nifi.provenance.search.QuerySubmission;
@@ -93,6 +102,190 @@ public class TestJournalingProvenanceRepository {
}
+ @Test
+ public void testStoreRestartAndRetrieve() throws IOException {
+ final JournalingRepositoryConfig config = new JournalingRepositoryConfig();
+ final Map<String, File> containers = new HashMap<>();
+ containers.put("container1", new File("target/" + UUID.randomUUID().toString()));
+ containers.put("container2", new File("target/" + UUID.randomUUID().toString()));
+ config.setContainers(containers);
+ config.setPartitionCount(3);
+
+ try {
+ try (final JournalingProvenanceRepository repo = new JournalingProvenanceRepository(config)) {
+ repo.initialize(null);
+ final Map<String, String> attributes = new HashMap<>();
+
+ for (int i=0; i < 10; i++) {
+ attributes.put("i", String.valueOf(i));
+ repo.registerEvent(TestUtil.generateEvent(i, attributes));
+ }
+
+ assertEquals(10L, repo.getMaxEventId().longValue());
+ }
+
+ try (final JournalingProvenanceRepository repo = new JournalingProvenanceRepository(config)) {
+ repo.initialize(null);
+
+ assertEquals(10L, repo.getMaxEventId().longValue());
+
+ // retrieve records one at a time.
+ for (int i=0; i < 10; i++) {
+ final StoredProvenanceEvent event = repo.getEvent(i);
+ assertNotNull(event);
+ assertEquals((long) i, event.getEventId());
+ assertEquals("00000000-0000-0000-0000-00000000000" + i, event.getFlowFileUuid());
+ }
+
+ final List<StoredProvenanceEvent> events = repo.getEvents(0, 1000);
+ assertNotNull(events);
+ assertEquals(10, events.size());
+ for (int i=0; i < 10; i++) {
+ final StoredProvenanceEvent event = events.get(i);
+ assertNotNull(event);
+ assertEquals((long) i, event.getEventId());
+ assertEquals("00000000-0000-0000-0000-00000000000" + i, event.getFlowFileUuid());
+ }
+ }
+ } finally {
+ for ( final File file : containers.values() ) {
+ if ( file.exists() ) {
+ FileUtils.deleteFile(file, true);
+ }
+ }
+ }
+ }
+
+
+ @Test
+ public void testStoreRestartRetrieveAndExpireOnTime() throws IOException, InterruptedException {
+ final JournalingRepositoryConfig config = new JournalingRepositoryConfig();
+ final Map<String, File> containers = new HashMap<>();
+ containers.put("container1", new File("target/" + UUID.randomUUID().toString()));
+ containers.put("container2", new File("target/" + UUID.randomUUID().toString()));
+ config.setContainers(containers);
+ config.setPartitionCount(3);
+
+ try {
+ try (final JournalingProvenanceRepository repo = new JournalingProvenanceRepository(config)) {
+ repo.initialize(null);
+ final Map<String, String> attributes = new HashMap<>();
+
+ for (int i=0; i < 10; i++) {
+ attributes.put("i", String.valueOf(i));
+ repo.registerEvent(TestUtil.generateEvent(i, attributes));
+ }
+
+ assertEquals(10L, repo.getMaxEventId().longValue());
+ }
+
+ config.setExpirationFrequency(1, TimeUnit.SECONDS);
+ try (final JournalingProvenanceRepository repo = new JournalingProvenanceRepository(config)) {
+ repo.initialize(null);
+
+ assertEquals(10L, repo.getMaxEventId().longValue());
+
+ // retrieve records one at a time.
+ for (int i=0; i < 10; i++) {
+ final StoredProvenanceEvent event = repo.getEvent(i);
+ assertNotNull(event);
+ assertEquals((long) i, event.getEventId());
+ assertEquals("00000000-0000-0000-0000-00000000000" + i, event.getFlowFileUuid());
+ }
+
+ final List<StoredProvenanceEvent> events = repo.getEvents(0, 1000);
+ assertNotNull(events);
+ assertEquals(10, events.size());
+ for (int i=0; i < 10; i++) {
+ final StoredProvenanceEvent event = events.get(i);
+ assertNotNull(event);
+ assertEquals((long) i, event.getEventId());
+ assertEquals("00000000-0000-0000-0000-00000000000" + i, event.getFlowFileUuid());
+ }
+
+ // wait a bit for the events to be expired
+ TimeUnit.SECONDS.sleep(2L);
+
+ // retrieve records one at a time.
+ for (int i=0; i < 10; i++) {
+ final StoredProvenanceEvent event = repo.getEvent(i);
+ assertNull("Event " + i + " still exists", event);
+ }
+
+ final List<StoredProvenanceEvent> allEvents = repo.getEvents(0, 1000);
+ assertNotNull(allEvents);
+ assertEquals(0, allEvents.size());
+ }
+ } finally {
+ for ( final File file : containers.values() ) {
+ if ( file.exists() ) {
+ FileUtils.deleteFile(file, true);
+ }
+ }
+ }
+ }
+
+
+ @Test
+ public void testExpireOnSize() throws IOException, InterruptedException {
+ final JournalingRepositoryConfig config = new JournalingRepositoryConfig();
+ final Map<String, File> containers = new HashMap<>();
+ containers.put("container1", new File("target/" + UUID.randomUUID().toString()));
+ containers.put("container2", new File("target/" + UUID.randomUUID().toString()));
+ config.setContainers(containers);
+ config.setPartitionCount(3);
+ config.setMaxStorageCapacity(1024L * 50);
+ config.setEventExpiration(2, TimeUnit.SECONDS);
+ config.setExpirationFrequency(1, TimeUnit.SECONDS);
+ config.setJournalRolloverPeriod(1, TimeUnit.SECONDS);
+ config.setCompressOnRollover(false);
+
+ try {
+ try (final JournalingProvenanceRepository repo = new JournalingProvenanceRepository(config)) {
+ repo.initialize(null);
+ final Map<String, String> attributes = new HashMap<>();
+
+ final int numEventsToInsert = 1000;
+ for (int i=0; i < numEventsToInsert; i++) {
+ attributes.put("i", String.valueOf(i));
+ repo.registerEvent(TestUtil.generateEvent(i, attributes));
+ }
+
+ final List<StoredProvenanceEvent> eventsBeforeExpire = repo.getEvents(0, numEventsToInsert * 2);
+ assertNotNull(eventsBeforeExpire);
+ assertEquals(numEventsToInsert, eventsBeforeExpire.size());
+
+ // wait a bit for expiration to occur
+ TimeUnit.SECONDS.sleep(3L);
+
+ // generate an event for each partition to force a rollover of the journals
+ for (int i=0; i < config.getPartitionCount(); i++) {
+ repo.registerEvent(TestUtil.generateEvent(100000L));
+ }
+
+ TimeUnit.SECONDS.sleep(1L);
+
+ // retrieve records one at a time.
+ for (int i=0; i < numEventsToInsert; i++) {
+ final StoredProvenanceEvent event = repo.getEvent(i);
+ assertNull("Event " + i + " still exists", event);
+ }
+
+ final List<StoredProvenanceEvent> eventsAfterExpire = repo.getEvents(0, numEventsToInsert * 2);
+ assertNotNull(eventsAfterExpire);
+ assertEquals(3, eventsAfterExpire.size());
+ }
+ } finally {
+ for ( final File file : containers.values() ) {
+ if ( file.exists() ) {
+ FileUtils.deleteFile(file, true);
+ }
+ }
+ }
+ }
+
+
+
@Test(timeout=10000000)
public void testSearchByUUID() throws IOException, InterruptedException {
final JournalingRepositoryConfig config = new JournalingRepositoryConfig();
@@ -141,4 +334,98 @@ public class TestJournalingProvenanceRepository {
}
}
+
+ @Test(timeout=10000)
+ public void testReceiveDropLineage() throws IOException, InterruptedException {
+ final JournalingRepositoryConfig config = new JournalingRepositoryConfig();
+ final Map<String, File> containers = new HashMap<>();
+ containers.put("container1", new File("target/" + UUID.randomUUID().toString()));
+ containers.put("container2", new File("target/" + UUID.randomUUID().toString()));
+ config.setContainers(containers);
+
+ config.setPartitionCount(3);
+ config.setSearchableFields(Arrays.asList(new SearchableField[] {
+ SearchableFields.FlowFileUUID
+ }));
+
+ try (final JournalingProvenanceRepository repo = new JournalingProvenanceRepository(config)) {
+ repo.initialize(null);
+
+ final String uuid = "00000000-0000-0000-0000-000000000001";
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("abc", "xyz");
+ attributes.put("uuid", uuid);
+ attributes.put("filename", "file-" + uuid);
+
+ final StandardProvenanceEventRecord.Builder builder = new StandardProvenanceEventRecord.Builder()
+ .setEventType(ProvenanceEventType.RECEIVE)
+ .setFlowFileUUID(uuid)
+ .setComponentType("Unit Test")
+ .setComponentId(UUID.randomUUID().toString())
+ .setEventTime(System.currentTimeMillis())
+ .setFlowFileEntryDate(System.currentTimeMillis() - 1000L)
+ .setLineageStartDate(System.currentTimeMillis() - 2000L)
+ .setCurrentContentClaim(null, null, null, null, 0L)
+ .setAttributes(null, attributes == null ? Collections.<String, String>emptyMap() : attributes);
+
+ builder.setTransitUri("nifi://unit-test");
+ attributes.put("uuid", uuid);
+ builder.setComponentId("1234");
+ builder.setComponentType("dummy processor");
+
+ // Add RECEIVE Event
+ repo.registerEvent(builder.build());
+
+ builder.setEventTime(System.currentTimeMillis() + 1);
+ builder.setEventType(ProvenanceEventType.DROP);
+ builder.setTransitUri(null);
+
+ // Add DROP event
+ repo.registerEvent(builder.build());
+
+ // register unrelated even to make sure we don't get this one.
+ builder.setFlowFileUUID("00000000-0000-0000-0000-000000000002");
+ repo.registerEvent(builder.build());
+
+ final ComputeLineageSubmission submission = repo.submitLineageComputation(uuid);
+ assertNotNull(submission);
+
+ final ComputeLineageResult result = submission.getResult();
+ while ( !result.isFinished() ) {
+ Thread.sleep(50L);
+ }
+
+ assertNull(result.getError());
+
+ final List<LineageNode> nodes = result.getNodes();
+ assertEquals(3, nodes.size()); // RECEIVE, FlowFile node, DROP
+
+ int receiveCount = 0;
+ int dropCount = 0;
+ int flowFileNodeCount = 0;
+ for ( final LineageNode node : nodes ) {
+ assertEquals(uuid, node.getFlowFileUuid());
+
+ if ( LineageNodeType.PROVENANCE_EVENT_NODE.equals(node.getNodeType()) ) {
+ final ProvenanceEventLineageNode eventNode = (ProvenanceEventLineageNode) node;
+ if ( eventNode.getEventType() == ProvenanceEventType.RECEIVE ) {
+ receiveCount++;
+ } else if ( eventNode.getEventType() == ProvenanceEventType.DROP ) {
+ dropCount++;
+ }
+ } else {
+ flowFileNodeCount++;
+ }
+ }
+
+ assertEquals(1, receiveCount);
+ assertEquals(1, dropCount);
+ assertEquals(1, flowFileNodeCount);
+ } finally {
+ for ( final File file : containers.values() ) {
+ FileUtils.deleteFile(file, true);
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/index/TestEventIndexWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/index/TestEventIndexWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/index/TestEventIndexWriter.java
index dfaeb1a..e611aaa 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/index/TestEventIndexWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/index/TestEventIndexWriter.java
@@ -35,6 +35,7 @@ import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
import org.apache.nifi.provenance.search.Query;
import org.apache.nifi.provenance.search.SearchTerms;
import org.apache.nifi.provenance.search.SearchableField;
+import org.apache.nifi.util.file.FileUtils;
import org.junit.Test;
public class TestEventIndexWriter {
@@ -51,11 +52,9 @@ public class TestEventIndexWriter {
final File indexDir = new File("target/" + UUID.randomUUID().toString());
- final File journalFile = new File("target/" + UUID.randomUUID().toString());
try (final LuceneIndexWriter indexWriter = new LuceneIndexWriter(indexDir, config)) {
-
final ProvenanceEventRecord event = TestUtil.generateEvent(23L);
- final JournaledStorageLocation location = new JournaledStorageLocation("container", "section", "journalId", 2, 23L);
+ final JournaledStorageLocation location = new JournaledStorageLocation("container", "section", 1L, 2, 23L);
final JournaledProvenanceEvent storedEvent = new JournaledProvenanceEvent(event, location);
indexWriter.index(Collections.singleton(storedEvent));
@@ -72,12 +71,12 @@ public class TestEventIndexWriter {
assertNotNull(found);
assertEquals("container", found.getContainerName());
assertEquals("section", found.getSectionName());
- assertEquals("journalId", found.getJournalId());
+ assertEquals(1L, found.getJournalId().longValue());
assertEquals(2, found.getBlockIndex());
assertEquals(23L, found.getEventId());
}
} finally {
- journalFile.delete();
+ FileUtils.deleteFile(indexDir, true);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java
index d5eab8e..43efa7e 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/journals/TestStandardJournalWriter.java
@@ -17,11 +17,13 @@
package org.apache.nifi.provenance.journaling.journals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.util.Collections;
import java.util.UUID;
@@ -32,11 +34,47 @@ import org.apache.nifi.provenance.journaling.TestUtil;
import org.apache.nifi.provenance.journaling.io.StandardEventDeserializer;
import org.apache.nifi.provenance.journaling.io.StandardEventSerializer;
import org.apache.nifi.remote.io.CompressionInputStream;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.Assert;
import org.junit.Test;
public class TestStandardJournalWriter {
@Test
+ public void testOverwriteEmptyFile() throws IOException {
+ final File journalFile = new File("target/" + UUID.randomUUID().toString());
+ try {
+ assertTrue( journalFile.createNewFile() );
+
+ try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, true, new StandardEventSerializer())) {
+
+ }
+ } finally {
+ FileUtils.deleteFile(journalFile, false);
+ }
+ }
+
+ @Test
+ public void testDoNotOverwriteNonEmptyFile() throws IOException {
+ final File journalFile = new File("target/" + UUID.randomUUID().toString());
+ try {
+ assertTrue( journalFile.createNewFile() );
+
+ try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, true, new StandardEventSerializer())) {
+ writer.write(Collections.singleton(TestUtil.generateEvent(1L)), 1L);
+ }
+
+ try (final StandardJournalWriter writer = new StandardJournalWriter(1L, journalFile, true, new StandardEventSerializer())) {
+ Assert.fail("StandardJournalWriter attempted to overwrite existing file");
+ } catch (final FileAlreadyExistsException faee) {
+ // expected
+ }
+ } finally {
+ FileUtils.deleteFile(journalFile, false);
+ }
+ }
+
+ @Test
public void testOneBlockOneRecordWriteCompressed() throws IOException {
final File journalFile = new File("target/" + UUID.randomUUID().toString());
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/toc/TestStandardTocWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/toc/TestStandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/toc/TestStandardTocWriter.java
new file mode 100644
index 0000000..3cddb5e
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/toc/TestStandardTocWriter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.toc;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.util.UUID;
+
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestStandardTocWriter {
+ @Test
+ public void testOverwriteEmptyFile() throws IOException {
+ final File tocFile = new File("target/" + UUID.randomUUID().toString() + ".toc");
+ try {
+ assertTrue( tocFile.createNewFile() );
+
+ try (final StandardTocWriter writer = new StandardTocWriter(tocFile, false, false)) {
+ }
+ } finally {
+ FileUtils.deleteFile(tocFile, false);
+ }
+ }
+
+ @Test
+ public void testDoNotOverwriteNonEmptyFile() throws IOException {
+ final File tocFile = new File("target/" + UUID.randomUUID().toString() + ".toc");
+ try {
+ assertTrue( tocFile.createNewFile() );
+
+ try (final StandardTocWriter writer = new StandardTocWriter(tocFile, false, false)) {
+ writer.addBlockOffset(0L);
+ writer.addBlockOffset(34L);
+ }
+
+ try (final StandardTocWriter writer = new StandardTocWriter(tocFile, false, false)) {
+ Assert.fail("StandardTocWriter attempted to overwrite existing file");
+ } catch (final FileAlreadyExistsException faee) {
+ // expected
+ }
+ } finally {
+ FileUtils.deleteFile(tocFile, false);
+ }
+ }
+}