You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/11/07 21:28:48 UTC
nifi git commit: NIFI-2778 added abilty to interrupt Lucene search
Repository: nifi
Updated Branches:
refs/heads/master 6aefc0b91 -> 5fd4a5579
NIFI-2778 added abilty to interrupt Lucene search
polishing
This closes #1138
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5fd4a557
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5fd4a557
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5fd4a557
Branch: refs/heads/master
Commit: 5fd4a55791da27fdba577636ac985a294618328a
Parents: 6aefc0b
Author: Oleg Zhurakousky <ol...@suitcase.io>
Authored: Fri Oct 14 10:50:29 2016 -0400
Committer: Matt Burgess <ma...@apache.org>
Committed: Mon Nov 7 16:28:16 2016 -0500
----------------------------------------------------------------------
.../nifi/provenance/AsyncQuerySubmission.java | 12 ++++++++++++
.../PersistentProvenanceRepository.java | 2 +-
.../provenance/lucene/SimpleIndexManager.java | 18 +++++++++++++++++-
3 files changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/5fd4a557/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncQuerySubmission.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncQuerySubmission.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncQuerySubmission.java
index cd2ab39..66858b4 100644
--- a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncQuerySubmission.java
+++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncQuerySubmission.java
@@ -16,7 +16,10 @@
*/
package org.apache.nifi.provenance;
+import java.util.ArrayList;
import java.util.Date;
+import java.util.List;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.provenance.search.Query;
@@ -33,6 +36,8 @@ public class AsyncQuerySubmission implements QuerySubmission {
private final StandardQueryResult queryResult;
private final String submitterId;
+ private final List<Future<?>> queryExecutions = new ArrayList<>();
+
/**
* Constructs an AsyncQuerySubmission with the given query and the given
* number of steps, indicating how many results must be added to this
@@ -65,6 +70,9 @@ public class AsyncQuerySubmission implements QuerySubmission {
@Override
public void cancel() {
this.canceled = true;
+ for (Future<?> queryExecution : this.queryExecutions) {
+ queryExecution.cancel(true);
+ }
queryResult.cancel();
}
@@ -82,4 +90,8 @@ public class AsyncQuerySubmission implements QuerySubmission {
public StandardQueryResult getResult() {
return queryResult;
}
+
+ public void addQueryExecution(Future<?> execution) {
+ this.queryExecutions.add(execution);
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/5fd4a557/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 0788716..f70bf7d 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -2021,7 +2021,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
result.getResult().update(Collections.<ProvenanceEventRecord>emptyList(), 0L);
} else {
for (final File indexDir : indexDirectories) {
- queryExecService.submit(new QueryRunnable(query, result, user, indexDir, retrievalCount));
+ result.addQueryExecution(queryExecService.submit(new QueryRunnable(query, result, user, indexDir, retrievalCount)));
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/5fd4a557/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java
index daf6413..9e3bacd 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java
@@ -26,6 +26,9 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
@@ -44,8 +47,21 @@ public class SimpleIndexManager implements IndexManager {
private final ConcurrentMap<Object, List<Closeable>> closeables = new ConcurrentHashMap<>();
private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
+ private final ExecutorService searchExecutor = Executors.newCachedThreadPool();
+
+
@Override
public void close() throws IOException {
+ logger.debug("Shutting down SimpleIndexManager search executor");
+ this.searchExecutor.shutdown();
+ try {
+ if (!this.searchExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+ this.searchExecutor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ this.searchExecutor.shutdownNow();
+ }
}
@Override
@@ -53,7 +69,7 @@ public class SimpleIndexManager implements IndexManager {
logger.debug("Creating index searcher for {}", indexDir);
final Directory directory = FSDirectory.open(indexDir);
final DirectoryReader directoryReader = DirectoryReader.open(directory);
- final IndexSearcher searcher = new IndexSearcher(directoryReader);
+ final IndexSearcher searcher = new IndexSearcher(directoryReader, this.searchExecutor);
final List<Closeable> closeableList = new ArrayList<>(2);
closeableList.add(directoryReader);