You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2016/07/29 13:34:18 UTC

nifi git commit: NIFI-2395 This closes #734. Ensure that if we fail to index provenance events we do not prevent the repo from continuing to merge journals

Repository: nifi
Updated Branches:
  refs/heads/master cddbe7d41 -> cfc8a9613


NIFI-2395 This closes #734. Ensure that if we fail to index provenance events we do not prevent the repo from continuing to merge journals


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/cfc8a961
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/cfc8a961
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/cfc8a961

Branch: refs/heads/master
Commit: cfc8a9613cb071247ef22f8fe4a3abb4e6b83151
Parents: cddbe7d
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Jul 28 10:19:45 2016 -0400
Committer: joewitt <jo...@apache.org>
Committed: Fri Jul 29 09:33:47 2016 -0400

----------------------------------------------------------------------
 .../PersistentProvenanceRepository.java         | 64 ++++++++++++++++----
 .../TestPersistentProvenanceRepository.java     | 54 +++++++++++++++++
 2 files changed, 106 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/cfc8a961/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 7e8b9a6..aee8277 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -123,6 +123,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
     public static final Pattern INDEX_PATTERN = Pattern.compile("index-\\d+");
     public static final Pattern LOG_FILENAME_PATTERN = Pattern.compile("(\\d+).*\\.prov");
     public static final int MAX_UNDELETED_QUERY_RESULTS = 10;
+    public static final int MAX_INDEXING_FAILURE_COUNT = 5; // how many indexing failures we will tolerate before skipping indexing for a prov file
 
     private static final Logger logger = LoggerFactory.getLogger(PersistentProvenanceRepository.class);
 
@@ -1648,7 +1649,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
             try (final RecordWriter writer = RecordWriters.newRecordWriter(writerFile, configuration.isCompressOnRollover(), true)) {
                 writer.writeHeader(minEventId);
 
-                final IndexingAction indexingAction = new IndexingAction(this);
+                final IndexingAction indexingAction = createIndexingAction();
 
                 final File indexingDirectory = indexConfig.getWritableIndexDirectory(writerFile, earliestTimestamp);
                 long maxId = 0L;
@@ -1668,24 +1669,33 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
                         }
                     });
 
+                    final AtomicInteger indexingFailureCount = new AtomicInteger(0);
                     try {
                         for (int i = 0; i < configuration.getIndexThreadPoolSize(); i++) {
                             final Callable<Object> callable = new Callable<Object>() {
                                 @Override
                                 public Object call() throws IOException {
                                     while (!eventQueue.isEmpty() || !finishedAdding.get()) {
-                                        final Tuple<StandardProvenanceEventRecord, Integer> tuple;
                                         try {
-                                            tuple = eventQueue.poll(10, TimeUnit.MILLISECONDS);
-                                        } catch (final InterruptedException ie) {
-                                            continue;
+                                            final Tuple<StandardProvenanceEventRecord, Integer> tuple;
+                                            try {
+                                                tuple = eventQueue.poll(10, TimeUnit.MILLISECONDS);
+                                            } catch (final InterruptedException ie) {
+                                                Thread.currentThread().interrupt();
+                                                continue;
+                                            }
+
+                                            if (tuple == null) {
+                                                continue;
+                                            }
+
+                                            indexingAction.index(tuple.getKey(), indexWriter, tuple.getValue());
+                                        } catch (final Throwable t) {
+                                            logger.error("Failed to index Provenance Event for " + writerFile + " to " + indexingDirectory, t);
+                                            if (indexingFailureCount.incrementAndGet() >= MAX_INDEXING_FAILURE_COUNT) {
+                                                return null;
+                                            }
                                         }
-
-                                        if (tuple == null) {
-                                            continue;
-                                        }
-
-                                        indexingAction.index(tuple.getKey(), indexWriter, tuple.getValue());
                                     }
 
                                     return null;
@@ -1696,6 +1706,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
                             futures.add(future);
                         }
 
+                        boolean indexEvents = true;
                         while (!recordToReaderMap.isEmpty()) {
                             final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
                             final StandardProvenanceEventRecord record = entry.getKey();
@@ -1705,12 +1716,30 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
                             final int blockIndex = writer.getTocWriter().getCurrentBlockIndex();
 
                             boolean accepted = false;
-                            while (!accepted) {
+                            while (!accepted && indexEvents) {
                                 try {
                                     accepted = eventQueue.offer(new Tuple<>(record, blockIndex), 10, TimeUnit.MILLISECONDS);
                                 } catch (final InterruptedException ie) {
+                                    Thread.currentThread().interrupt();
+                                }
+
+                                // If we weren't able to add anything to the queue, check if we have reached our max failure count.
+                                // We do this here because if we do reach our max failure count, all of the indexing threads will stop
+                                // performing their jobs. As a result, the queue will fill and we won't be able to add anything to it.
+                                // So, if the queue is filled, we will check if this is the case.
+                                if (!accepted && indexingFailureCount.get() >= MAX_INDEXING_FAILURE_COUNT) {
+                                    indexEvents = false;  // don't add anything else to the queue.
+                                    eventQueue.clear();
+
+                                    final String warning = String.format("Indexing Provenance Events for %s has failed %s times. This exceeds the maximum threshold of %s failures, "
+                                        + "so no more Provenance Events will be indexed for this Provenance file.", writerFile, indexingFailureCount.get(), MAX_INDEXING_FAILURE_COUNT);
+                                    logger.warn(warning);
+                                    if (eventReporter != null) {
+                                        eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, warning);
+                                    }
                                 }
                             }
+
                             maxId = record.getEventId();
 
                             latestRecords.add(truncateAttributes(record));
@@ -1747,6 +1776,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
 
                             throw new RuntimeException(t);
                         } catch (final InterruptedException e) {
+                            Thread.currentThread().interrupt();
                             throw new RuntimeException("Thread interrupted");
                         }
                     }
@@ -1810,6 +1840,15 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
         return writerFile;
     }
 
+    /**
+     * This method is protected and exists for testing purposes. This allows unit tests to extend this class and
+     * override the createIndexingAction so that they can mock out the Indexing Action to throw Exceptions, count
+     * events indexed, etc.
+     */
+    protected IndexingAction createIndexingAction() {
+        return new IndexingAction(this);
+    }
+
     private StandardProvenanceEventRecord truncateAttributes(final StandardProvenanceEventRecord original) {
         boolean requireTruncation = false;
 
@@ -2264,6 +2303,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
         throw new AccessDeniedException("Cannot retrieve Provenance Query Submission because " + user.getIdentity() + " is not the user who submitted the request");
     }
 
+    @Override
     public ProvenanceEventRecord getEvent(final long id) throws IOException {
         final List<ProvenanceEventRecord> records = getEvents(id, 1);
         if (records.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/cfc8a961/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index c4fe6ed..d7738e7 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -20,6 +20,7 @@ import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.core.SimpleAnalyzer;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.queryparser.classic.ParseException;
 import org.apache.lucene.queryparser.classic.QueryParser;
 import org.apache.lucene.search.IndexSearcher;
@@ -35,6 +36,7 @@ import org.apache.nifi.provenance.lineage.Lineage;
 import org.apache.nifi.provenance.lineage.LineageEdge;
 import org.apache.nifi.provenance.lineage.LineageNode;
 import org.apache.nifi.provenance.lineage.LineageNodeType;
+import org.apache.nifi.provenance.lucene.IndexingAction;
 import org.apache.nifi.provenance.search.Query;
 import org.apache.nifi.provenance.search.QueryResult;
 import org.apache.nifi.provenance.search.QuerySubmission;
@@ -1536,6 +1538,58 @@ public class TestPersistentProvenanceRepository {
     }
 
 
+    @Test(timeout=5000)
+    public void testExceptionOnIndex() throws IOException {
+        final RepositoryConfiguration config = createConfiguration();
+        config.setMaxAttributeChars(50);
+        config.setMaxEventFileLife(3, TimeUnit.SECONDS);
+        config.setIndexThreadPoolSize(1);
+
+        final int numEventsToIndex = 10;
+
+        final AtomicInteger indexedEventCount = new AtomicInteger(0);
+        repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
+            @Override
+            protected synchronized IndexingAction createIndexingAction() {
+                return new IndexingAction(repo) {
+                    @Override
+                    public void index(StandardProvenanceEventRecord record, IndexWriter indexWriter, Integer blockIndex) throws IOException {
+                        final int count = indexedEventCount.incrementAndGet();
+                        if (count <= numEventsToIndex) {
+                            return;
+                        }
+
+                        throw new IOException("Unit Test - Intentional Exception");
+                    }
+                };
+            }
+        };
+        repo.initialize(getEventReporter(), null, null);
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
+
+        final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+
+        for (int i=0; i < 1000; i++) {
+            final ProvenanceEventRecord record = builder.build();
+            repo.registerEvent(record);
+        }
+
+        repo.waitForRollover();
+
+        assertEquals(numEventsToIndex + PersistentProvenanceRepository.MAX_INDEXING_FAILURE_COUNT, indexedEventCount.get());
+        assertEquals(1, reportedEvents.size());
+        final ReportedEvent event = reportedEvents.get(0);
+        assertEquals(Severity.WARNING, event.getSeverity());
+    }
+
     @Test
     public void testFailureToCreateWriterDoesNotPreventSubsequentRollover() throws IOException, InterruptedException {
         final RepositoryConfiguration config = createConfiguration();