You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/27 21:47:34 UTC
[07/12] incubator-nifi git commit: continuing to implement
continuing to implement
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a5f557ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a5f557ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a5f557ad
Branch: refs/heads/journaling-prov-repo
Commit: a5f557ad966c4fa70ae0a0239e3bf70dcd788ff0
Parents: b95e756
Author: Mark Payne <ma...@hotmail.com>
Authored: Sun Feb 15 10:45:29 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Sun Feb 15 10:45:29 2015 -0500
----------------------------------------------------------------------
.../JournalingProvenanceRepository.java | 30 ++-
.../config/JournalingRepositoryConfig.java | 5 +
.../journaling/index/EventIndexSearcher.java | 2 +
.../journaling/index/IndexManager.java | 41 +++
.../journaling/index/LuceneIndexManager.java | 178 +++++++++++++
.../journaling/index/LuceneIndexSearcher.java | 23 ++
.../journaling/index/LuceneIndexWriter.java | 7 +-
.../journaling/index/MultiIndexSearcher.java | 112 ++++++++
.../journals/StandardJournalWriter.java | 40 ++-
.../partition/JournalingPartition.java | 254 ++++++++++---------
.../journaling/partition/PartitionManager.java | 13 +
.../partition/QueuingPartitionManager.java | 16 +-
.../journaling/tasks/CompressionTask.java | 2 +-
.../journaling/toc/StandardTocWriter.java | 25 +-
14 files changed, 607 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
index 7911d73..cc97ee9 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
@@ -30,9 +30,10 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -47,6 +48,8 @@ import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.provenance.StorageLocation;
import org.apache.nifi.provenance.StoredProvenanceEvent;
import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
+import org.apache.nifi.provenance.journaling.index.IndexManager;
+import org.apache.nifi.provenance.journaling.index.LuceneIndexManager;
import org.apache.nifi.provenance.journaling.index.QueryUtils;
import org.apache.nifi.provenance.journaling.journals.JournalReader;
import org.apache.nifi.provenance.journaling.journals.StandardJournalReader;
@@ -75,11 +78,12 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository
private final JournalingRepositoryConfig config;
private final AtomicLong idGenerator = new AtomicLong(0L);
- private final ExecutorService executor;
+ private final ScheduledExecutorService executor;
private EventReporter eventReporter; // effectively final
private PartitionManager partitionManager; // effectively final
private QueryManager queryManager; // effectively final
+ private IndexManager indexManager; // effectively final
public JournalingProvenanceRepository() throws IOException {
this(createConfig());
@@ -87,7 +91,16 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository
public JournalingProvenanceRepository(final JournalingRepositoryConfig config) throws IOException {
this.config = config;
- this.executor = Executors.newFixedThreadPool(config.getThreadPoolSize());
+ this.executor = Executors.newScheduledThreadPool(config.getThreadPoolSize(), new ThreadFactory() {
+ private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
+
+ @Override
+ public Thread newThread(final Runnable r) {
+ final Thread thread = defaultFactory.newThread(r);
+ thread.setName("Provenance Repository Worker Thread");
+ return thread;
+ }
+ });
}
@@ -156,7 +169,8 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository
public synchronized void initialize(final EventReporter eventReporter) throws IOException {
this.eventReporter = eventReporter;
- this.partitionManager = new QueuingPartitionManager(config, executor);
+ this.indexManager = new LuceneIndexManager(config, executor);
+ this.partitionManager = new QueuingPartitionManager(indexManager, config, executor);
this.queryManager = new StandardQueryManager(partitionManager, config, 10);
}
@@ -312,7 +326,7 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository
@Override
public Long getMaxEventId() throws IOException {
- final Set<Long> maxIds = partitionManager.withEachPartition(new PartitionAction<Long>() {
+ final Set<Long> maxIds = partitionManager.withEachPartitionSerially(new PartitionAction<Long>() {
@Override
public Long perform(final Partition partition) throws IOException {
return partition.getMaxEventId();
@@ -374,6 +388,10 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository
partitionManager.shutdown();
}
+ indexManager.close();
+
+ // TODO: make sure that all are closed here!
+
executor.shutdown();
}
@@ -390,7 +408,7 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository
@Override
public Long getEarliestEventTime() throws IOException {
// Get the earliest event timestamp for each partition
- final Set<Long> earliestTimes = partitionManager.withEachPartition(new PartitionAction<Long>() {
+ final Set<Long> earliestTimes = partitionManager.withEachPartitionSerially(new PartitionAction<Long>() {
@Override
public Long perform(final Partition partition) throws IOException {
return partition.getEarliestEventTime();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java
index 8998932..18871c7 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java
@@ -35,6 +35,7 @@ public class JournalingRepositoryConfig {
private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB
private int partitionCount = 16;
private int blockSize = 5000;
+ private int indexesPerContainer = 2;
private List<SearchableField> searchableFields = new ArrayList<>();
private List<SearchableField> searchableAttributes = new ArrayList<>();
@@ -51,6 +52,10 @@ public class JournalingRepositoryConfig {
return readOnly;
}
+ public int getIndexesPerContainer() {
+ return indexesPerContainer;
+ }
+
/**
* Specifies where the repository will store data
*
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
index b669c53..753378d 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
@@ -33,4 +33,6 @@ public interface EventIndexSearcher extends Closeable {
SearchResult search(Query query) throws IOException;
List<JournaledStorageLocation> getEvents(long minEventId, int maxResults) throws IOException;
+
+ Long getMaxEventId(String container, String section) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java
new file mode 100644
index 0000000..141b84a
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.index;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public interface IndexManager extends Closeable {
+
+ /**
+ * Returns an EventIndexWriter for the given container.
+ * @param container
+ * @return
+ */
+ EventIndexWriter getIndexWriter(final String container);
+
+ /**
+ * Returns the max event ID that has been indexed for the given container and section.
+ *
+ * @param container
+ * @param section
+ * @return
+ */
+ Long getMaxEventId(String container, String section) throws IOException;
+
+ EventIndexSearcher newIndexSearcher(String containerName) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java
new file mode 100644
index 0000000..d10fd00
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.index;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LuceneIndexManager implements IndexManager {
+ private static final Logger logger = LoggerFactory.getLogger(LuceneIndexManager.class);
+
+ private final JournalingRepositoryConfig config;
+ private final ScheduledExecutorService executor;
+
+ private final Map<String, List<LuceneIndexWriter>> writers = new HashMap<>();
+ private final Map<String, AtomicLong> writerIndexes = new HashMap<>();
+
+ public LuceneIndexManager(final JournalingRepositoryConfig config, final ScheduledExecutorService executor) throws IOException {
+ this.config = config;
+ this.executor = executor;
+
+ final int rolloverSeconds = (int) config.getJournalRolloverPeriod(TimeUnit.SECONDS);
+ if ( !config.isReadOnly() ) {
+ for ( final Map.Entry<String, File> entry : config.getContainers().entrySet() ) {
+ final String containerName = entry.getKey();
+ final File container = entry.getValue();
+
+ final List<LuceneIndexWriter> writerList = new ArrayList<>(config.getIndexesPerContainer());
+ writers.put(containerName, writerList);
+ writerIndexes.put(containerName, new AtomicLong(0L));
+
+ for ( int i=0; i < config.getIndexesPerContainer(); i++ ){
+ final File indexDir = new File(container, "indices/" + i);
+ writerList.add(new LuceneIndexWriter(indexDir, config));
+ }
+
+ executor.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ sync(containerName);
+ } catch (final Throwable t) {
+ logger.error("Failed to sync Provenance Repository Container {} due to {}", containerName, t);
+ if ( logger.isDebugEnabled() ) {
+ logger.error("", t);
+ }
+ }
+ }
+ }, rolloverSeconds, rolloverSeconds, TimeUnit.SECONDS);
+ }
+ }
+ }
+
+ @Override
+ public EventIndexSearcher newIndexSearcher(final String containerName) throws IOException {
+ final File containerDir = config.getContainers().get(containerName);
+ if ( containerDir == null ) {
+ throw new IllegalArgumentException();
+ }
+
+ final List<EventIndexSearcher> searchers = new ArrayList<>();
+
+ try {
+ if (config.isReadOnly()) {
+ for (int i=0; i < config.getIndexesPerContainer(); i++) {
+ final File indexDir = new File(containerName, "indices/" + i);
+ searchers.add(new LuceneIndexSearcher(indexDir));
+ }
+ } else {
+ final List<LuceneIndexWriter> writerList = writers.get(containerName);
+ for ( final LuceneIndexWriter writer : writerList ) {
+ searchers.add(writer.newIndexSearcher());
+ }
+ }
+ } catch (final IOException ioe) {
+ // If we failed to create a searcher, we need to close all that we've already created.
+ for ( final EventIndexSearcher searcher : searchers ) {
+ try {
+ searcher.close();
+ } catch (final IOException ioe2) {
+ ioe.addSuppressed(ioe2);
+ }
+ }
+
+ throw ioe;
+ }
+
+ return new MultiIndexSearcher(searchers);
+ }
+
+ @Override
+ public LuceneIndexWriter getIndexWriter(final String container) {
+ if (config.isReadOnly() ) {
+ throw new IllegalStateException("Cannot obtain Index Writer because repository is read-only");
+ }
+
+ final AtomicLong index = writerIndexes.get(container);
+ if (index == null ) {
+ throw new IllegalArgumentException();
+ }
+
+ final long curVal = index.get();
+ final List<LuceneIndexWriter> writerList = writers.get(container);
+ return writerList.get((int) (curVal % writerList.size()));
+ }
+
+ @Override
+ public Long getMaxEventId(final String container, final String section) throws IOException {
+ final List<LuceneIndexWriter> writerList = writers.get(container);
+ if ( writerList == null ) {
+ return null;
+ }
+
+ Long max = null;
+ for ( final LuceneIndexWriter writer : writerList ) {
+ try (final EventIndexSearcher searcher = writer.newIndexSearcher()) {
+ final Long maxForWriter = searcher.getMaxEventId(container, section);
+ if ( maxForWriter != null ) {
+ if (max == null || maxForWriter.longValue() > max.longValue() ) {
+ max = maxForWriter;
+ }
+ }
+ }
+ }
+
+ return max;
+ }
+
+
+ private void sync(final String containerName) throws IOException {
+ final AtomicLong index = writerIndexes.get(containerName);
+ final long curIndex = index.getAndIncrement();
+
+ final List<LuceneIndexWriter> writerList = writers.get(containerName);
+ final EventIndexWriter toSync = writerList.get((int) (curIndex % writerList.size()));
+ toSync.sync();
+ }
+
+ @Override
+ public void close() throws IOException {
+ for ( final List<LuceneIndexWriter> writerList : writers.values() ) {
+ for ( final LuceneIndexWriter writer : writerList ) {
+ try {
+ writer.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close {} due to {}", writer, ioe);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
index 32dc7c3..a9dd1a5 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.IndexSearcher;
@@ -31,6 +32,7 @@ import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortField.Type;
+import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.FSDirectory;
import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
@@ -110,4 +112,25 @@ public class LuceneIndexSearcher implements EventIndexSearcher {
return getLocations(topDocs);
}
+ @Override
+ public Long getMaxEventId(final String container, final String section) throws IOException {
+ final BooleanQuery query = new BooleanQuery();
+
+ if ( container != null ) {
+ query.add(new TermQuery(new Term(IndexedFieldNames.CONTAINER_NAME, container)), Occur.MUST);
+ }
+
+ if ( section != null ) {
+ query.add(new TermQuery(new Term(IndexedFieldNames.SECTION_NAME, section)), Occur.MUST);
+ }
+
+ final TopDocs topDocs = searcher.search(query, 1, new Sort(new SortField(IndexedFieldNames.EVENT_ID, Type.LONG, true)));
+ final List<JournaledStorageLocation> locations = getLocations(topDocs);
+ if ( locations.isEmpty() ) {
+ return null;
+ }
+
+ return locations.get(0).getEventId();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java
index e955ae5..b61ad34 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java
@@ -18,9 +18,11 @@ package org.apache.nifi.provenance.journaling.index;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@@ -117,6 +119,7 @@ public class LuceneIndexWriter implements EventIndexWriter {
public void index(final Collection<JournaledProvenanceEvent> events) throws IOException {
long maxId = this.indexMaxId.get();
+ final List<Document> documents = new ArrayList<>(events.size());
for ( final JournaledProvenanceEvent event : events ) {
maxId = event.getEventId();
@@ -189,8 +192,10 @@ public class LuceneIndexWriter implements EventIndexWriter {
}
}
- indexWriter.addDocument(doc);
+ documents.add(doc);
}
+
+ indexWriter.addDocuments(documents);
// Update the index's max id
boolean updated = false;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java
new file mode 100644
index 0000000..d086ff5
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.index;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
+import org.apache.nifi.provenance.search.Query;
+
+public class MultiIndexSearcher implements EventIndexSearcher {
+ private final List<EventIndexSearcher> searchers;
+
+ public MultiIndexSearcher(final List<EventIndexSearcher> searchers) {
+ this.searchers = searchers;
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOException suppressed = null;
+
+ for ( final EventIndexSearcher searcher : searchers ) {
+ try {
+ searcher.close();
+ } catch (final IOException ioe) {
+ if ( suppressed == null ) {
+ suppressed = ioe;
+ } else {
+ suppressed.addSuppressed(ioe);
+ }
+ }
+ }
+
+ if ( suppressed != null ) {
+ throw suppressed;
+ }
+ }
+
+ @Override
+ public SearchResult search(final Query query) throws IOException {
+ int totalHitCount = 0;
+ final List<JournaledStorageLocation> locations = new ArrayList<>();
+
+ for ( final EventIndexSearcher searcher : searchers ) {
+ final SearchResult result = searcher.search(query);
+ totalHitCount += result.getTotalCount();
+ locations.addAll(result.getLocations());
+ }
+
+ return new SearchResult(locations, totalHitCount);
+ }
+
+ @Override
+ public List<JournaledStorageLocation> getEvents(final long minEventId, final int maxResults) throws IOException {
+ final List<JournaledStorageLocation> locations = new ArrayList<>();
+ int results = 0;
+
+ // Perform search against all searchers and aggregate results.
+ for ( final EventIndexSearcher searcher : searchers ) {
+ final List<JournaledStorageLocation> searchLocations = searcher.getEvents(minEventId, maxResults);
+ locations.addAll(searchLocations);
+ if ( !searchLocations.isEmpty() ) {
+ results++;
+ }
+ }
+
+ // Results from this call are sorted. If we have only 0 or 1 searchers that had results, then
+ // we don't need to sort anything. Otherwise, we need to sort and return just the first X
+ // number of results.
+ if ( results > 1 ) {
+ Collections.sort(locations);
+ }
+
+ if ( locations.size() > maxResults ) {
+ return locations.subList(0, maxResults);
+ }
+
+ return locations;
+ }
+
+ @Override
+ public Long getMaxEventId(final String container, final String section) throws IOException {
+ Long max = null;
+ for ( final EventIndexSearcher searcher : searchers ) {
+ final Long maxForWriter = searcher.getMaxEventId(container, section);
+ if ( maxForWriter != null ) {
+ if (max == null || maxForWriter.longValue() > max.longValue() ) {
+ max = maxForWriter;
+ }
+ }
+ }
+
+ return max;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
index 5a289fe..af5f8de 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
@@ -16,8 +16,6 @@
*/
package org.apache.nifi.provenance.journaling.journals;
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
@@ -29,6 +27,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.journaling.io.Serializer;
import org.apache.nifi.remote.io.CompressionOutputStream;
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
@@ -96,6 +96,9 @@ public class StandardJournalWriter implements JournalWriter {
private OutputStream compressedStream;
private ByteCountingOutputStream out;
+ private long recordBytes = 256L;
+ private long recordCount = 1L;
+
public StandardJournalWriter(final long journalId, final File journalFile, final boolean compressed, final Serializer serializer) throws IOException {
this.journalId = journalId;
@@ -132,16 +135,38 @@ public class StandardJournalWriter implements JournalWriter {
@Override
public void close() throws IOException {
finishBlock();
-
- if ( compressedStream != null ) {
+
+ IOException suppressed = null;
+ try {
compressedStream.flush();
compressedStream.close();
+ } catch (final IOException ioe) {
+ suppressed = ioe;
+ }
+
+ try {
+ try {
+ uncompressedStream.flush();
+ } finally {
+ uncompressedStream.close();
+ }
+ } catch (final IOException ioe) {
+ if ( suppressed != null ) {
+ ioe.addSuppressed(suppressed);
+ }
+ throw ioe;
+ }
+
+ if ( suppressed != null ) {
+ throw suppressed;
}
}
@Override
public void write(final Collection<ProvenanceEventRecord> events, final long firstEventId) throws IOException {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final int avgRecordSize = (int) (recordBytes / recordCount);
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream(avgRecordSize);
final DataOutputStream serializerDos = new DataOutputStream(baos);
final BufferedOutputStream bos = new BufferedOutputStream(out);
@@ -153,10 +178,13 @@ public class StandardJournalWriter implements JournalWriter {
serializer.serialize(event, serializerDos);
serializerDos.flush();
- final int recordLength = 8 + baos.size(); // record length is length of ID (8 bytes) plus length of serialized record
+ final int serializedLength = baos.size();
+ final int recordLength = 8 + serializedLength; // record length is length of ID (8 bytes) plus length of serialized record
outDos.writeInt(recordLength);
outDos.writeLong(id++);
baos.writeTo(outDos);
+ recordBytes += recordLength;
+ recordCount++;
baos.reset();
eventCount++;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
index 651c41e..1ace37f 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
@@ -35,8 +35,11 @@ import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent;
import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
import org.apache.nifi.provenance.journaling.index.EventIndexSearcher;
+import org.apache.nifi.provenance.journaling.index.EventIndexWriter;
+import org.apache.nifi.provenance.journaling.index.IndexManager;
import org.apache.nifi.provenance.journaling.index.LuceneIndexSearcher;
import org.apache.nifi.provenance.journaling.index.LuceneIndexWriter;
+import org.apache.nifi.provenance.journaling.index.MultiIndexSearcher;
import org.apache.nifi.provenance.journaling.index.QueryUtils;
import org.apache.nifi.provenance.journaling.io.StandardEventSerializer;
import org.apache.nifi.provenance.journaling.journals.JournalReader;
@@ -55,13 +58,12 @@ public class JournalingPartition implements Partition {
private static final String JOURNAL_FILE_EXTENSION = ".journal";
private final String containerName;
- private final String sectionName;
+ private final int sectionIndex;
private final File section;
private final File journalsDir;
private final JournalingRepositoryConfig config;
private final ExecutorService executor;
- private final LuceneIndexWriter indexWriter;
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
@@ -73,9 +75,12 @@ public class JournalingPartition implements Partition {
private volatile long maxEventId = -1L;
private volatile Long earliestEventTime = null;
- public JournalingPartition(final String containerName, final String sectionName, final File sectionDir, final JournalingRepositoryConfig config, final ExecutorService executor) throws IOException {
+ private final IndexManager indexManager;
+
+ public JournalingPartition(final IndexManager indexManager, final String containerName, final int sectionIndex, final File sectionDir, final JournalingRepositoryConfig config, final ExecutorService executor) throws IOException {
+ this.indexManager = indexManager;
this.containerName = containerName;
- this.sectionName = sectionName;
+ this.sectionIndex = sectionIndex;
this.section = sectionDir;
this.journalsDir = new File(section, "journals");
this.config = config;
@@ -88,22 +93,11 @@ public class JournalingPartition implements Partition {
if ( journalsDir.exists() && journalsDir.isFile() ) {
throw new IOException("Could not create directory " + section + " because a file already exists with this name");
}
-
- if ( config.isReadOnly() ) {
- indexWriter = null;
- } else {
- final File indexDir = new File(section, "index");
- indexWriter = new LuceneIndexWriter(indexDir, config);
- }
}
public EventIndexSearcher newIndexSearcher() throws IOException {
- if (config.isReadOnly()) {
- return new LuceneIndexSearcher(new File(section, "index"));
- }
-
- return indexWriter.newIndexSearcher();
+ return indexManager.newIndexSearcher(containerName);
}
protected JournalWriter getJournalWriter(final long firstEventId) throws IOException {
@@ -118,6 +112,11 @@ public class JournalingPartition implements Partition {
return journalWriter;
}
+ // MUST be called with writeLock or readLock held.
+ private EventIndexWriter getIndexWriter() {
+ return indexManager.getIndexWriter(containerName);
+ }
+
@Override
public List<JournaledProvenanceEvent> registerEvents(final Collection<ProvenanceEventRecord> events, final long firstEventId) throws IOException {
writeLock.lock();
@@ -139,12 +138,13 @@ public class JournalingPartition implements Partition {
final List<JournaledProvenanceEvent> storedEvents = new ArrayList<>(events.size());
long id = firstEventId;
for (final ProvenanceEventRecord event : events) {
- final JournaledStorageLocation location = new JournaledStorageLocation(containerName, sectionName,
+ final JournaledStorageLocation location = new JournaledStorageLocation(containerName, String.valueOf(sectionIndex),
String.valueOf(writer.getJournalId()), tocWriter.getCurrentBlockIndex(), id++);
final JournaledProvenanceEvent storedEvent = new JournaledProvenanceEvent(event, location);
storedEvents.add(storedEvent);
}
+ final EventIndexWriter indexWriter = getIndexWriter();
indexWriter.index(storedEvents);
if ( config.isAlwaysSync() ) {
@@ -196,13 +196,28 @@ public class JournalingPartition implements Partition {
// MUST be called with write lock held.
private void rollover(final long firstEventId) throws IOException {
+ // TODO: Rework how rollover works because we now have index manager!!
+
// if we have a writer already, close it and initiate rollover actions
if ( journalWriter != null ) {
journalWriter.finishBlock();
journalWriter.close();
tocWriter.close();
- indexWriter.sync();
-
+
+ final EventIndexWriter curWriter = getIndexWriter();
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ curWriter.sync();
+ } catch (final IOException e) {
+
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ });
+
if ( config.isCompressOnRollover() ) {
final File finishedFile = journalWriter.getJournalFile();
final File finishedTocFile = tocWriter.getFile();
@@ -213,7 +228,7 @@ public class JournalingPartition implements Partition {
// create new writers and reset state.
final File journalFile = new File(journalsDir, firstEventId + JOURNAL_FILE_EXTENSION);
journalWriter = new StandardJournalWriter(firstEventId, journalFile, false, new StandardEventSerializer());
- tocWriter = new StandardTocWriter(QueryUtils.getTocFile(journalFile), false);
+ tocWriter = new StandardTocWriter(QueryUtils.getTocFile(journalFile), false, config.isAlwaysSync());
tocWriter.addBlockOffset(journalWriter.getSize());
numEventsAtEndOfLastBlock = 0;
}
@@ -237,112 +252,123 @@ public class JournalingPartition implements Partition {
@Override
public void restore() throws IOException {
- // delete or rename files if stopped during rollover; compress any files that haven't been compressed
- if ( !config.isReadOnly() ) {
- final File[] children = journalsDir.listFiles();
- if ( children != null ) {
- // find the latest journal.
- File latestJournal = null;
- long latestJournalId = -1L;
-
- final List<File> journalFiles = new ArrayList<>();
-
- // find any journal files that either haven't been compressed or were partially compressed when
- // we last shutdown and then restart compression.
- for ( final File file : children ) {
- final String filename = file.getName();
- if ( !filename.contains(JOURNAL_FILE_EXTENSION) ) {
- continue;
- }
+ writeLock.lock();
+ try {
+ // delete or rename files if stopped during rollover; compress any files that haven't been compressed
+ if ( !config.isReadOnly() ) {
+ final File[] children = journalsDir.listFiles();
+ if ( children != null ) {
+ // find the latest journal.
+ File latestJournal = null;
+ long latestJournalId = -1L;
+
+ final List<File> journalFiles = new ArrayList<>();
- final Long journalId = getJournalId(file);
- if ( journalId != null && journalId > latestJournalId ) {
- latestJournal = file;
- latestJournalId = journalId;
+ // find any journal files that either haven't been compressed or were partially compressed when
+ // we last shutdown and then restart compression.
+ for ( final File file : children ) {
+ final String filename = file.getName();
+ if ( !filename.contains(JOURNAL_FILE_EXTENSION) ) {
+ continue;
+ }
+
+ final Long journalId = getJournalId(file);
+ if ( journalId != null && journalId > latestJournalId ) {
+ latestJournal = file;
+ latestJournalId = journalId;
+ }
+
+ journalFiles.add(file);
+
+ if ( !config.isCompressOnRollover() ) {
+ continue;
+ }
+
+ if ( filename.endsWith(CompressionTask.FILE_EXTENSION) ) {
+ final File uncompressedFile = new File(journalsDir, filename.replace(CompressionTask.FILE_EXTENSION, ""));
+ if ( uncompressedFile.exists() ) {
+ // both the compressed and uncompressed version of this journal exist. The Compression Task was
+ // not complete when we shutdown. Delete the compressed journal and toc and re-start the Compression Task.
+ final File tocFile = QueryUtils.getTocFile(uncompressedFile);
+ executor.submit(new CompressionTask(uncompressedFile, getJournalId(uncompressedFile), tocFile));
+ } else {
+ // The compressed file exists but the uncompressed file does not. This means that we have finished
+ // writing the compressed file and deleted the original journal file but then shutdown before
+ // renaming the compressed file to the original filename. We can simply rename the compressed file
+ // to the original file and then address the TOC file.
+ final boolean rename = CompressionTask.rename(file, uncompressedFile);
+ if ( !rename ) {
+ logger.warn("{} During recovery, failed to rename {} to {}", this, file, uncompressedFile);
+ continue;
+ }
+
+ // Check if the compressed TOC file exists. If not, we are finished.
+ // If it does exist, then we know that it is complete, as described above, so we will go
+ // ahead and replace the uncompressed version.
+ final File tocFile = QueryUtils.getTocFile(uncompressedFile);
+ final File compressedTocFile = new File(tocFile.getParentFile(), tocFile.getName() + CompressionTask.FILE_EXTENSION);
+ if ( !compressedTocFile.exists() ) {
+ continue;
+ }
+
+ tocFile.delete();
+
+ final boolean renamedTocFile = CompressionTask.rename(compressedTocFile, tocFile);
+ if ( !renamedTocFile ) {
+ logger.warn("{} During recovery, failed to rename {} to {}", this, compressedTocFile, tocFile);
+ }
+ }
+ }
}
- journalFiles.add(file);
+ // Get the first event in the earliest journal file so that we know what the earliest time available is
+ Collections.sort(journalFiles, new Comparator<File>() {
+ @Override
+ public int compare(final File o1, final File o2) {
+ return Long.compare(getJournalId(o1), getJournalId(o2));
+ }
+ });
- if ( !config.isCompressOnRollover() ) {
- continue;
+ for ( final File journal : journalFiles ) {
+ try (final JournalReader reader = new StandardJournalReader(journal)) {
+ final ProvenanceEventRecord record = reader.nextEvent();
+ this.earliestEventTime = record.getEventTime();
+ break;
+ } catch (final IOException ioe) {
+ }
}
- if ( filename.endsWith(CompressionTask.FILE_EXTENSION) ) {
- final File uncompressedFile = new File(journalsDir, filename.replace(CompressionTask.FILE_EXTENSION, ""));
- if ( uncompressedFile.exists() ) {
- // both the compressed and uncompressed version of this journal exist. The Compression Task was
- // not complete when we shutdown. Delete the compressed journal and toc and re-start the Compression Task.
- final File tocFile = QueryUtils.getTocFile(uncompressedFile);
- executor.submit(new CompressionTask(uncompressedFile, getJournalId(uncompressedFile), tocFile));
- } else {
- // The compressed file exists but the uncompressed file does not. This means that we have finished
- // writing the compressed file and deleted the original journal file but then shutdown before
- // renaming the compressed file to the original filename. We can simply rename the compressed file
- // to the original file and then address the TOC file.
- final boolean rename = CompressionTask.rename(file, uncompressedFile);
- if ( !rename ) {
- logger.warn("{} During recovery, failed to rename {} to {}", this, file, uncompressedFile);
- continue;
- }
-
- // Check if the compressed TOC file exists. If not, we are finished.
- // If it does exist, then we know that it is complete, as described above, so we will go
- // ahead and replace the uncompressed version.
- final File tocFile = QueryUtils.getTocFile(uncompressedFile);
- final File compressedTocFile = new File(tocFile.getParentFile(), tocFile.getName() + CompressionTask.FILE_EXTENSION);
- if ( !compressedTocFile.exists() ) {
- continue;
- }
-
- tocFile.delete();
-
- final boolean renamedTocFile = CompressionTask.rename(compressedTocFile, tocFile);
- if ( !renamedTocFile ) {
- logger.warn("{} During recovery, failed to rename {} to {}", this, compressedTocFile, tocFile);
- }
+ // Whatever was the last journal for this partition, we need to remove anything for that journal
+ // from the index and re-add them, and then sync the index. This allows us to avoid syncing
+ // the index each time (we sync only on rollover) but allows us to still ensure that we index
+ // all events.
+ if ( latestJournal != null ) {
+ try {
+ reindex(latestJournal);
+ } catch (final EOFException eof) {
}
}
}
-
- // Get the first event in the earliest journal file so that we know what the earliest time available is
- Collections.sort(journalFiles, new Comparator<File>() {
- @Override
- public int compare(final File o1, final File o2) {
- return Long.compare(getJournalId(o1), getJournalId(o2));
- }
- });
-
- for ( final File journal : journalFiles ) {
- try (final JournalReader reader = new StandardJournalReader(journal)) {
- final ProvenanceEventRecord record = reader.nextEvent();
- this.earliestEventTime = record.getEventTime();
- break;
- } catch (final IOException ioe) {
- }
- }
-
- // Whatever was the last journal for this partition, we need to remove anything for that journal
- // from the index and re-add them, and then sync the index. This allows us to avoid syncing
- // the index each time (we sync only on rollover) but allows us to still ensure that we index
- // all events.
- if ( latestJournal != null ) {
- try {
- reindex(latestJournal);
- } catch (final EOFException eof) {
- }
- }
}
+ } finally {
+ writeLock.unlock();
}
}
private void reindex(final File journalFile) throws IOException {
- try (final TocJournalReader reader = new TocJournalReader(containerName, sectionName, String.valueOf(getJournalId(journalFile)), journalFile)) {
- indexWriter.delete(containerName, sectionName, String.valueOf(getJournalId(journalFile)));
+ // TODO: Rework how recovery works because we now have index manager!!
+ try (final TocJournalReader reader = new TocJournalReader(containerName, String.valueOf(sectionIndex), String.valueOf(getJournalId(journalFile)), journalFile)) {
+ // We don't know which index contains the data for this journal, so remove the journal
+ // from both.
+ for (final LuceneIndexWriter indexWriter : indexWriters ) {
+ indexWriter.delete(containerName, String.valueOf(sectionIndex), String.valueOf(getJournalId(journalFile)));
+ }
long maxId = -1L;
final List<JournaledProvenanceEvent> storedEvents = new ArrayList<>(1000);
JournaledProvenanceEvent event;
+ final LuceneIndexWriter indexWriter = indexWriters[0];
while ((event = reader.nextJournaledEvent()) != null ) {
storedEvents.add(event);
maxId = event.getEventId();
@@ -365,7 +391,7 @@ public class JournalingPartition implements Partition {
@Override
public List<JournaledStorageLocation> getEvents(final long minEventId, final int maxRecords) throws IOException {
- try (final EventIndexSearcher searcher = indexWriter.newIndexSearcher()) {
+ try (final EventIndexSearcher searcher = newIndexSearcher()) {
return searcher.getEvents(minEventId, maxRecords);
}
}
@@ -401,16 +427,6 @@ public class JournalingPartition implements Partition {
}
}
- if ( indexWriter != null ) {
- try {
- indexWriter.close();
- } catch (final IOException ioe) {
- logger.warn("Failed to close {} due to {}", indexWriter, ioe);
- if ( logger.isDebugEnabled() ) {
- logger.warn("", ioe);
- }
- }
- }
}
@Override
@@ -425,6 +441,6 @@ public class JournalingPartition implements Partition {
@Override
public String toString() {
- return "Partition[section=" + sectionName + "]";
+ return "Partition[section=" + sectionIndex + "]";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java
index edbf75b..c0a56c4 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java
@@ -55,6 +55,19 @@ public interface PartitionManager {
<T> Set<T> withEachPartition(PartitionAction<T> action) throws IOException;
/**
+ * Performs the given Action on each partition and returns the set of results. Unlike
+ * {@link #withEachPartition(PartitionAction))}, this method does not use the thread pool
+ * in order to perform the request in parallel. This is desirable for very quick functions,
+ * as the thread pool can be fully utilized, resulting in a quick function taking far longer
+ * than it should.
+ *
+ * @param action the action to perform
+ * @param writeAction specifies whether or not the action writes to the repository
+ * @return
+ */
+ <T> Set<T> withEachPartitionSerially(PartitionAction<T> action) throws IOException;
+
+ /**
* Performs the given Action to each partition, optionally waiting for the action to complete
* @param action
* @param writeAction
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
index 51d90e2..10af697 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
+import org.apache.nifi.provenance.journaling.index.IndexManager;
import org.apache.nifi.util.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +51,7 @@ public class QueuingPartitionManager implements PartitionManager {
private final AtomicInteger blacklistedCount = new AtomicInteger(0);
- public QueuingPartitionManager(final JournalingRepositoryConfig config, final ExecutorService executor) throws IOException {
+ public QueuingPartitionManager(final IndexManager indexManager, final JournalingRepositoryConfig config, final ExecutorService executor) throws IOException {
this.config = config;
this.partitionQueue = new LinkedBlockingQueue<>(config.getPartitionCount());
this.partitionArray = new JournalingPartition[config.getPartitionCount()];
@@ -64,7 +65,7 @@ public class QueuingPartitionManager implements PartitionManager {
final Tuple<String, File> tuple = containerTuples.get(i % containerTuples.size());
final File section = new File(tuple.getValue(), String.valueOf(i));
- final JournalingPartition partition = new JournalingPartition(tuple.getKey(), String.valueOf(i), section, config, executor);
+ final JournalingPartition partition = new JournalingPartition(indexManager, tuple.getKey(), i, section, config, executor);
partitionQueue.offer(partition);
partitionArray[i] = partition;
}
@@ -183,6 +184,17 @@ public class QueuingPartitionManager implements PartitionManager {
}
@Override
+ public <T> Set<T> withEachPartitionSerially(final PartitionAction<T> action) throws IOException {
+ // TODO: Do not use blacklisted partitions.
+ final Set<T> results = new HashSet<>(partitionArray.length);
+ for ( final Partition partition : partitionArray ) {
+ results.add( action.perform(partition) );
+ }
+
+ return results;
+ }
+
+ @Override
public void withEachPartition(final VoidPartitionAction action, final boolean async) {
// TODO: Do not use blacklisted partitions.
final Map<Partition, Future<?>> futures = new HashMap<>(partitionArray.length);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
index c23a405..a6a487b 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
@@ -133,7 +133,7 @@ public class CompressionTask implements Runnable {
try (final JournalReader journalReader = new StandardJournalReader(journalFile);
final JournalWriter compressedWriter = new StandardJournalWriter(journalId, compressedFile, true, new StandardEventSerializer());
final TocReader tocReader = new StandardTocReader(tocFile);
- final TocWriter compressedTocWriter = new StandardTocWriter(compressedTocFile, true)) {
+ final TocWriter compressedTocWriter = new StandardTocWriter(compressedTocFile, true, false)) {
compress(journalReader, compressedWriter, tocReader, compressedTocWriter);
compressedWriter.sync();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java
index 6058282..fea6057 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java
@@ -40,6 +40,7 @@ public class StandardTocWriter implements TocWriter {
private final File file;
private final FileOutputStream fos;
+ private final boolean alwaysSync;
private int index = 0;
/**
@@ -48,7 +49,7 @@ public class StandardTocWriter implements TocWriter {
* @param compressionFlag whether or not the journal is compressed
* @throws FileNotFoundException
*/
- public StandardTocWriter(final File file, final boolean compressionFlag) throws IOException {
+ public StandardTocWriter(final File file, final boolean compressionFlag, final boolean alwaysSync) throws IOException {
if ( file.exists() ) {
throw new FileAlreadyExistsException(file.getAbsolutePath());
}
@@ -59,11 +60,17 @@ public class StandardTocWriter implements TocWriter {
this.file = file;
fos = new FileOutputStream(file);
-
- fos.write(VERSION);
- fos.write(compressionFlag ? 1 : 0);
+ this.alwaysSync = alwaysSync;
+
+ final byte[] header = new byte[2];
+ header[0] = VERSION;
+ header[1] = (byte) (compressionFlag ? 1 : 0);
+ fos.write(header);
fos.flush();
- fos.getFD().sync();
+
+ if ( alwaysSync ) {
+ fos.getFD().sync();
+ }
}
@Override
@@ -73,7 +80,9 @@ public class StandardTocWriter implements TocWriter {
dos.writeLong(offset);
dos.flush();
- fos.getFD().sync();
+ if ( alwaysSync ) {
+ fos.getFD().sync();
+ }
}
@Override
@@ -83,6 +92,10 @@ public class StandardTocWriter implements TocWriter {
@Override
public void close() throws IOException {
+ if (alwaysSync) {
+ fos.getFD().sync();
+ }
+
fos.close();
}