You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2016/07/29 13:34:18 UTC
nifi git commit: NIFI-2395 This closes #734. Ensure that if we fail
to index provenance events we do not prevent the repo from continuing to
merge journals
Repository: nifi
Updated Branches:
refs/heads/master cddbe7d41 -> cfc8a9613
NIFI-2395 This closes #734. Ensure that if we fail to index provenance events we do not prevent the repo from continuing to merge journals
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/cfc8a961
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/cfc8a961
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/cfc8a961
Branch: refs/heads/master
Commit: cfc8a9613cb071247ef22f8fe4a3abb4e6b83151
Parents: cddbe7d
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Jul 28 10:19:45 2016 -0400
Committer: joewitt <jo...@apache.org>
Committed: Fri Jul 29 09:33:47 2016 -0400
----------------------------------------------------------------------
.../PersistentProvenanceRepository.java | 64 ++++++++++++++++----
.../TestPersistentProvenanceRepository.java | 54 +++++++++++++++++
2 files changed, 106 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/cfc8a961/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 7e8b9a6..aee8277 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -123,6 +123,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
public static final Pattern INDEX_PATTERN = Pattern.compile("index-\\d+");
public static final Pattern LOG_FILENAME_PATTERN = Pattern.compile("(\\d+).*\\.prov");
public static final int MAX_UNDELETED_QUERY_RESULTS = 10;
+ public static final int MAX_INDEXING_FAILURE_COUNT = 5; // how many indexing failures we will tolerate before skipping indexing for a prov file
private static final Logger logger = LoggerFactory.getLogger(PersistentProvenanceRepository.class);
@@ -1648,7 +1649,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
try (final RecordWriter writer = RecordWriters.newRecordWriter(writerFile, configuration.isCompressOnRollover(), true)) {
writer.writeHeader(minEventId);
- final IndexingAction indexingAction = new IndexingAction(this);
+ final IndexingAction indexingAction = createIndexingAction();
final File indexingDirectory = indexConfig.getWritableIndexDirectory(writerFile, earliestTimestamp);
long maxId = 0L;
@@ -1668,24 +1669,33 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
}
});
+ final AtomicInteger indexingFailureCount = new AtomicInteger(0);
try {
for (int i = 0; i < configuration.getIndexThreadPoolSize(); i++) {
final Callable<Object> callable = new Callable<Object>() {
@Override
public Object call() throws IOException {
while (!eventQueue.isEmpty() || !finishedAdding.get()) {
- final Tuple<StandardProvenanceEventRecord, Integer> tuple;
try {
- tuple = eventQueue.poll(10, TimeUnit.MILLISECONDS);
- } catch (final InterruptedException ie) {
- continue;
+ final Tuple<StandardProvenanceEventRecord, Integer> tuple;
+ try {
+ tuple = eventQueue.poll(10, TimeUnit.MILLISECONDS);
+ } catch (final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ continue;
+ }
+
+ if (tuple == null) {
+ continue;
+ }
+
+ indexingAction.index(tuple.getKey(), indexWriter, tuple.getValue());
+ } catch (final Throwable t) {
+ logger.error("Failed to index Provenance Event for " + writerFile + " to " + indexingDirectory, t);
+ if (indexingFailureCount.incrementAndGet() >= MAX_INDEXING_FAILURE_COUNT) {
+ return null;
+ }
}
-
- if (tuple == null) {
- continue;
- }
-
- indexingAction.index(tuple.getKey(), indexWriter, tuple.getValue());
}
return null;
@@ -1696,6 +1706,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
futures.add(future);
}
+ boolean indexEvents = true;
while (!recordToReaderMap.isEmpty()) {
final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
final StandardProvenanceEventRecord record = entry.getKey();
@@ -1705,12 +1716,30 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
final int blockIndex = writer.getTocWriter().getCurrentBlockIndex();
boolean accepted = false;
- while (!accepted) {
+ while (!accepted && indexEvents) {
try {
accepted = eventQueue.offer(new Tuple<>(record, blockIndex), 10, TimeUnit.MILLISECONDS);
} catch (final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+
+ // If we weren't able to add anything to the queue, check if we have reached our max failure count.
+ // We do this here because if we do reach our max failure count, all of the indexing threads will stop
+ // performing their jobs. As a result, the queue will fill and we won't be able to add anything to it.
+ // So, if the queue is filled, we will check if this is the case.
+ if (!accepted && indexingFailureCount.get() >= MAX_INDEXING_FAILURE_COUNT) {
+ indexEvents = false; // don't add anything else to the queue.
+ eventQueue.clear();
+
+ final String warning = String.format("Indexing Provenance Events for %s has failed %s times. This exceeds the maximum threshold of %s failures, "
+ + "so no more Provenance Events will be indexed for this Provenance file.", writerFile, indexingFailureCount.get(), MAX_INDEXING_FAILURE_COUNT);
+ logger.warn(warning);
+ if (eventReporter != null) {
+ eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, warning);
+ }
}
}
+
maxId = record.getEventId();
latestRecords.add(truncateAttributes(record));
@@ -1747,6 +1776,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
throw new RuntimeException(t);
} catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
throw new RuntimeException("Thread interrupted");
}
}
@@ -1810,6 +1840,15 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
return writerFile;
}
+ /**
+ * This method is protected and exists for testing purposes. This allows unit tests to extend this class and
+ * override the createIndexingAction so that they can mock out the Indexing Action to throw Exceptions, count
+ * events indexed, etc.
+ */
+ protected IndexingAction createIndexingAction() {
+ return new IndexingAction(this);
+ }
+
private StandardProvenanceEventRecord truncateAttributes(final StandardProvenanceEventRecord original) {
boolean requireTruncation = false;
@@ -2264,6 +2303,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
throw new AccessDeniedException("Cannot retrieve Provenance Query Submission because " + user.getIdentity() + " is not the user who submitted the request");
}
+ @Override
public ProvenanceEventRecord getEvent(final long id) throws IOException {
final List<ProvenanceEventRecord> records = getEvents(id, 1);
if (records.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/cfc8a961/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index c4fe6ed..d7738e7 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -20,6 +20,7 @@ import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.core.SimpleAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.queryparser.classic.ParseException;
import org.apache.lucene.queryparser.classic.QueryParser;
import org.apache.lucene.search.IndexSearcher;
@@ -35,6 +36,7 @@ import org.apache.nifi.provenance.lineage.Lineage;
import org.apache.nifi.provenance.lineage.LineageEdge;
import org.apache.nifi.provenance.lineage.LineageNode;
import org.apache.nifi.provenance.lineage.LineageNodeType;
+import org.apache.nifi.provenance.lucene.IndexingAction;
import org.apache.nifi.provenance.search.Query;
import org.apache.nifi.provenance.search.QueryResult;
import org.apache.nifi.provenance.search.QuerySubmission;
@@ -1536,6 +1538,58 @@ public class TestPersistentProvenanceRepository {
}
+ @Test(timeout=5000)
+ public void testExceptionOnIndex() throws IOException {
+ final RepositoryConfiguration config = createConfiguration();
+ config.setMaxAttributeChars(50);
+ config.setMaxEventFileLife(3, TimeUnit.SECONDS);
+ config.setIndexThreadPoolSize(1);
+
+ final int numEventsToIndex = 10;
+
+ final AtomicInteger indexedEventCount = new AtomicInteger(0);
+ repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
+ @Override
+ protected synchronized IndexingAction createIndexingAction() {
+ return new IndexingAction(repo) {
+ @Override
+ public void index(StandardProvenanceEventRecord record, IndexWriter indexWriter, Integer blockIndex) throws IOException {
+ final int count = indexedEventCount.incrementAndGet();
+ if (count <= numEventsToIndex) {
+ return;
+ }
+
+ throw new IOException("Unit Test - Intentional Exception");
+ }
+ };
+ }
+ };
+ repo.initialize(getEventReporter(), null, null);
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
+
+ final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+ builder.setEventTime(System.currentTimeMillis());
+ builder.setEventType(ProvenanceEventType.RECEIVE);
+ builder.setTransitUri("nifi://unit-test");
+ builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+ builder.setComponentId("1234");
+ builder.setComponentType("dummy processor");
+
+ for (int i=0; i < 1000; i++) {
+ final ProvenanceEventRecord record = builder.build();
+ repo.registerEvent(record);
+ }
+
+ repo.waitForRollover();
+
+ assertEquals(numEventsToIndex + PersistentProvenanceRepository.MAX_INDEXING_FAILURE_COUNT, indexedEventCount.get());
+ assertEquals(1, reportedEvents.size());
+ final ReportedEvent event = reportedEvents.get(0);
+ assertEquals(Severity.WARNING, event.getSeverity());
+ }
+
@Test
public void testFailureToCreateWriterDoesNotPreventSubsequentRollover() throws IOException, InterruptedException {
final RepositoryConfiguration config = createConfiguration();