You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "Joseph Witt (JIRA)" <ji...@apache.org> on 2016/07/27 01:56:20 UTC

[jira] [Commented] (NIFI-2395) PersistentProvenanceRepository Deadlocks caused by a blocked journal merge

    [ https://issues.apache.org/jira/browse/NIFI-2395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15394927#comment-15394927 ] 

Joseph Witt commented on NIFI-2395:
-----------------------------------

raising to blocker until it can be proven invalid somehow or the issue can be identified.  The analysis is quite reasonable so there is clearly something at play.  If we can get a thread dump that would be super helpful.

> PersistentProvenanceRepository Deadlocks caused by a blocked journal merge
> --------------------------------------------------------------------------
>
>                 Key: NIFI-2395
>                 URL: https://issues.apache.org/jira/browse/NIFI-2395
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Core Framework
>    Affects Versions: 0.6.0, 0.7.0
>            Reporter: Brian Davis
>            Assignee: Joseph Witt
>            Priority: Blocker
>
> I have a nifi instance that I have been running for about a week and has deadlocked at least 3 times during this time.  When I say deadlock the whole nifi instance stops doing any progress on flowfiles.  I looked at the stack trace and there are a lot of threads stuck doing tasks in the PersistentProvenanceRepository.  Looking at the code I think this is what is happening:
> There is a ReadWriteLock that all the reads are waiting for a write.  The write is in the loop:
> {code}
>                 while (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) {
>                     // if a shutdown happens while we are in this loop, kill the rollover thread and break
>                     if (this.closed.get()) {
>                         if (future != null) {
>                             future.cancel(true);
>                         }
>                         break;
>                     }
>                     if (repoSize > sizeThreshold) {
>                         logger.debug("Provenance Repository has exceeded its size threshold; will trigger purging of oldest events");
>                         purgeOldEvents();
>                         journalFileCount = getJournalCount();
>                         repoSize = getSize(getLogFiles(), 0L);
>                         continue;
>                     } else {
>                         // if we are constrained by the number of journal files rather than the size of the repo,
>                         // then we will just sleep a bit because another thread is already actively merging the journals,
>                         // due to the runnable that we scheduled above
>                         try {
>                             Thread.sleep(100L);
>                         } catch (final InterruptedException ie) {
>                         }
>                     }
>                     logger.debug("Provenance Repository is still behind. Keeping flow slowed down "
>                             + "to accommodate. Currently, there are {} journal files ({} bytes) and "
>                             + "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold);
>                     journalFileCount = getJournalCount();
>                     repoSize = getSize(getLogFiles(), 0L);
>                 }
>                 logger.info("Provenance Repository has now caught up with rolling over journal files. Current number of "
>                         + "journal files to be rolled over is {}", journalFileCount);
>             }
> {code}
> My nifi is at the sleep indefinitely.  The reason my nifi cannot move forward is because of the thread doing the merge is stopped.  The thread doing the merge is at:
> {code}
> accepted = eventQueue.offer(new Tuple<>(record, blockIndex), 10, TimeUnit.MILLISECONDS);
> {code}
> so the queue is full.  
> What I believe happened is that the callables created here:
> {code}
>                             final Callable<Object> callable = new Callable<Object>() {
>                                 @Override
>                                 public Object call() throws IOException {
>                                     while (!eventQueue.isEmpty() || !finishedAdding.get()) {
>                                         final Tuple<StandardProvenanceEventRecord, Integer> tuple;
>                                         try {
>                                             tuple = eventQueue.poll(10, TimeUnit.MILLISECONDS);
>                                         } catch (final InterruptedException ie) {
>                                             continue;
>                                         }
>                                         if (tuple == null) {
>                                             continue;
>                                         }
>                                         indexingAction.index(tuple.getKey(), indexWriter, tuple.getValue());
>                                     }
>                                     return null;
>                                 }
> {code}
> finish before the offer adds its first event because I do not see any Index Provenance Events threads.  My guess is the while loop condition is wrong and should be && instead of ||.
> I upped the thread count for the index creation from 1 to 3 to see if that helps.  I can tell you if that helps later this week.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)