You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/11/07 21:28:48 UTC

nifi git commit: NIFI-2778 added abilty to interrupt Lucene search

Repository: nifi
Updated Branches:
  refs/heads/master 6aefc0b91 -> 5fd4a5579


NIFI-2778 added abilty to interrupt Lucene search

polishing

This closes #1138


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5fd4a557
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5fd4a557
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5fd4a557

Branch: refs/heads/master
Commit: 5fd4a55791da27fdba577636ac985a294618328a
Parents: 6aefc0b
Author: Oleg Zhurakousky <ol...@suitcase.io>
Authored: Fri Oct 14 10:50:29 2016 -0400
Committer: Matt Burgess <ma...@apache.org>
Committed: Mon Nov 7 16:28:16 2016 -0500

----------------------------------------------------------------------
 .../nifi/provenance/AsyncQuerySubmission.java     | 12 ++++++++++++
 .../PersistentProvenanceRepository.java           |  2 +-
 .../provenance/lucene/SimpleIndexManager.java     | 18 +++++++++++++++++-
 3 files changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/5fd4a557/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncQuerySubmission.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncQuerySubmission.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncQuerySubmission.java
index cd2ab39..66858b4 100644
--- a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncQuerySubmission.java
+++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncQuerySubmission.java
@@ -16,7 +16,10 @@
  */
 package org.apache.nifi.provenance;
 
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.List;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.provenance.search.Query;
@@ -33,6 +36,8 @@ public class AsyncQuerySubmission implements QuerySubmission {
     private final StandardQueryResult queryResult;
     private final String submitterId;
 
+    private final List<Future<?>> queryExecutions = new ArrayList<>();
+
     /**
      * Constructs an AsyncQuerySubmission with the given query and the given
      * number of steps, indicating how many results must be added to this
@@ -65,6 +70,9 @@ public class AsyncQuerySubmission implements QuerySubmission {
     @Override
     public void cancel() {
         this.canceled = true;
+        for (Future<?> queryExecution : this.queryExecutions) {
+            queryExecution.cancel(true);
+        }
         queryResult.cancel();
     }
 
@@ -82,4 +90,8 @@ public class AsyncQuerySubmission implements QuerySubmission {
     public StandardQueryResult getResult() {
         return queryResult;
     }
+
+    public void addQueryExecution(Future<?> execution) {
+        this.queryExecutions.add(execution);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/5fd4a557/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 0788716..f70bf7d 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -2021,7 +2021,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
             result.getResult().update(Collections.<ProvenanceEventRecord>emptyList(), 0L);
         } else {
             for (final File indexDir : indexDirectories) {
-                queryExecService.submit(new QueryRunnable(query, result, user, indexDir, retrievalCount));
+                result.addQueryExecution(queryExecService.submit(new QueryRunnable(query, result, user, indexDir, retrievalCount)));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/5fd4a557/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java
index daf6413..9e3bacd 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java
@@ -26,6 +26,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
@@ -44,8 +47,21 @@ public class SimpleIndexManager implements IndexManager {
     private final ConcurrentMap<Object, List<Closeable>> closeables = new ConcurrentHashMap<>();
     private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
 
+    private final ExecutorService searchExecutor = Executors.newCachedThreadPool();
+
+
     @Override
     public void close() throws IOException {
+        logger.debug("Shutting down SimpleIndexManager search executor");
+        this.searchExecutor.shutdown();
+        try {
+            if (!this.searchExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+                this.searchExecutor.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            this.searchExecutor.shutdownNow();
+        }
     }
 
     @Override
@@ -53,7 +69,7 @@ public class SimpleIndexManager implements IndexManager {
         logger.debug("Creating index searcher for {}", indexDir);
         final Directory directory = FSDirectory.open(indexDir);
         final DirectoryReader directoryReader = DirectoryReader.open(directory);
-        final IndexSearcher searcher = new IndexSearcher(directoryReader);
+        final IndexSearcher searcher = new IndexSearcher(directoryReader, this.searchExecutor);
 
         final List<Closeable> closeableList = new ArrayList<>(2);
         closeableList.add(directoryReader);