You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/27 21:47:37 UTC
[10/12] incubator-nifi git commit: NIFI-388: Initial implementation
of prov repo; not yet finished but pushing so that the code is not lost
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java
index 18871c7..c1a7de8 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java
@@ -28,6 +28,8 @@ import org.apache.nifi.provenance.search.SearchableField;
public class JournalingRepositoryConfig {
private Map<String, File> containers = new HashMap<>();
+ private Map<String, Long> containerCapacities = new HashMap<>();
+
private long expirationMillis = TimeUnit.MILLISECONDS.convert(24, TimeUnit.HOURS);
private long storageCapacity = 1024L * 1024L * 1024L; // 1 GB
private long rolloverMillis = TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES);
@@ -35,13 +37,16 @@ public class JournalingRepositoryConfig {
private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB
private int partitionCount = 16;
private int blockSize = 5000;
- private int indexesPerContainer = 2;
-
+ private int indexesPerContainer = 1;
+ private long expirationFrequency = TimeUnit.MINUTES.toNanos(2L);
+
private List<SearchableField> searchableFields = new ArrayList<>();
private List<SearchableField> searchableAttributes = new ArrayList<>();
private boolean compress = true;
private boolean alwaysSync = false;
- private int threadPoolSize = 4;
+ private int workerThreadPoolSize = 2;
+ private int queryThreadPoolSize = 2;
+ private int compressionThreadPoolSize = 1;
private boolean readOnly = false;
public void setReadOnly(final boolean readOnly) {
@@ -74,6 +79,27 @@ public class JournalingRepositoryConfig {
this.containers = new HashMap<>(containers);
}
+
+ public long getMaxCapacity(final String containerName) {
+ final Long maxCapacity = containerCapacities.get(containerName);
+ if ( maxCapacity == null ) {
+ return getMaxStorageCapacity();
+ } else {
+ return maxCapacity;
+ }
+ }
+
+ public void setMaxContainerCapacities(final Map<String, Long> containerCapacities) {
+ this.containerCapacities = new HashMap<>(containerCapacities);
+ }
+
+ public void setMaxContainerCapacity(final String containerName, final long maxCapacity) {
+ if ( maxCapacity < 1 ) {
+ throw new IllegalArgumentException("Cannot set max container capacity for " + containerName + " to " + maxCapacity + " bytes");
+ }
+
+ this.containerCapacities.put(containerName, maxCapacity);
+ }
/**
* Returns the maximum amount of time that a given record will stay in the
* repository
@@ -207,15 +233,28 @@ public class JournalingRepositoryConfig {
this.compress = compress;
}
- public int getThreadPoolSize() {
- return threadPoolSize;
+ public int getWorkerThreadPoolSize() {
+ return workerThreadPoolSize;
+ }
+
+ public void setWorkerThreadPoolSize(final int workerThreadPoolSize) {
+ if (workerThreadPoolSize < 1) {
+ throw new IllegalArgumentException();
+ }
+ this.workerThreadPoolSize = workerThreadPoolSize;
}
+
+
- public void setThreadPoolSize(final int queryThreadPoolSize) {
+ public int getQueryThreadPoolSize() {
+ return queryThreadPoolSize;
+ }
+
+ public void setQueryThreadPoolSize(int queryThreadPoolSize) {
if (queryThreadPoolSize < 1) {
throw new IllegalArgumentException();
}
- this.threadPoolSize = queryThreadPoolSize;
+ this.queryThreadPoolSize = queryThreadPoolSize;
}
/**
@@ -308,7 +347,7 @@ public class JournalingRepositoryConfig {
* Returns the minimum number of Provenance Events that should be written to a single Block.
* Events are written out in blocks, which are later optionally compressed. A larger block size
* will potentially result in better compression. However, a smaller block size will result
- * in better performance when reading the data. The default value is 100 events per block.
+ * in better performance when reading the data. The default value is 5000 events per block.
*
* @return
*/
@@ -320,7 +359,7 @@ public class JournalingRepositoryConfig {
* Sets the minimum number of Provenance Events that should be written to a single Block.
* Events are written out in blocks, which are later optionally compressed. A larger block size
* will potentially result in better compression. However, a smaller block size will result
- * in better performance when reading the data. The default value is 100 events per block.
+ * in better performance when reading the data. The default value is 5000 events per block.
*
* @return
*/
@@ -330,4 +369,20 @@ public class JournalingRepositoryConfig {
}
this.blockSize = blockSize;
}
+
+ public int getCompressionThreadPoolSize() {
+ return compressionThreadPoolSize;
+ }
+
+ public void setCompressionThreadPoolSize(final int compressionThreadPoolSize) {
+ this.compressionThreadPoolSize = compressionThreadPoolSize;
+ }
+
+ public long getExpirationFrequency(final TimeUnit unit) {
+ return unit.convert(expirationFrequency, TimeUnit.NANOSECONDS);
+ }
+
+ public void setExpirationFrequency(final long period, final TimeUnit unit) {
+ this.expirationFrequency = unit.toNanos(period);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
index 753378d..85e02c0 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
@@ -18,6 +18,7 @@ package org.apache.nifi.provenance.journaling.index;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Collection;
import java.util.List;
import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
@@ -32,7 +33,53 @@ public interface EventIndexSearcher extends Closeable {
*/
SearchResult search(Query query) throws IOException;
+ /**
+ * Returns the locations of all events for a FlowFile that has a FlowFile UUID in the collection of
+ * UUIDs provided, if the event time occurs between earliestTime and latestTime. The return value is
+ * ordered in the order in which the records should be read from the journals in order to obtain
+ * maximum efficiency
+ *
+ * @param flowFileUuids
+ * @param earliestTime
+ * @param latestTime
+ *
+ * @return
+ * @throws IOException
+ */
+ List<JournaledStorageLocation> getEventsForFlowFiles(Collection<String> flowFileUuids, long earliestTime, long latestTime) throws IOException;
+
+ /**
+ * Returns the locations of events that have Event ID's at least equal to minEventId, and returns
+ * up to the given number of results
+ *
+ * @param minEventId
+ * @param maxResults
+ * @return
+ * @throws IOException
+ */
List<JournaledStorageLocation> getEvents(long minEventId, int maxResults) throws IOException;
+ /**
+ * Returns the largest event id that is known by the index being searched
+ * @param container
+ * @param section
+ * @return
+ * @throws IOException
+ */
Long getMaxEventId(String container, String section) throws IOException;
+
+ /**
+ * Returns the locations of the latest events for the index being searched
+ * @param numEvents
+ * @return
+ * @throws IOException
+ */
+ List<JournaledStorageLocation> getLatestEvents(int numEvents) throws IOException;
+
+ /**
+ * Returns the total number of events that exist for the index being searched
+ * @return
+ * @throws IOException
+ */
+ long getNumberOfEvents() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexWriter.java
index 1f231e9..a151838 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexWriter.java
@@ -44,5 +44,23 @@ public interface EventIndexWriter extends Closeable {
* @param journalId
* @throws IOException
*/
- void delete(String containerName, String section, String journalId) throws IOException;
+ void delete(String containerName, String section, Long journalId) throws IOException;
+
+ /**
+ * Deletes any records that belong to the given container and section but have a journal Id less
+ * than the specified value
+ * @param containerName
+ * @param section
+ * @param journalId
+ * @throws IOException
+ */
+ void deleteEventsBefore(String containerName, String section, Long journalId) throws IOException;
+
+
+ /**
+ * Removes all events from the index that occurred before the given time
+ * @param earliestEventTimeToDelete
+ * @throws IOException
+ */
+ void deleteOldEvents(long earliestEventTimeToDelete) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexAction.java
new file mode 100644
index 0000000..6486d56
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexAction.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.index;
+
+import java.io.IOException;
+
+public interface IndexAction<T> {
+ T perform(EventIndexSearcher searcher) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java
index 141b84a..34d1b18 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java
@@ -17,7 +17,9 @@
package org.apache.nifi.provenance.journaling.index;
import java.io.Closeable;
+import java.io.File;
import java.io.IOException;
+import java.util.Set;
public interface IndexManager extends Closeable {
@@ -37,5 +39,112 @@ public interface IndexManager extends Closeable {
*/
Long getMaxEventId(String container, String section) throws IOException;
+ /**
+ * Returns the total number of indices for all containers
+ * @return
+ */
+ int getNumberOfIndices();
+
+ /**
+ * Returns a new {@link EventIndexSearcher} that can be used to search the indices for
+ * the given container
+ *
+ * @param containerName the containerName to search
+ *
+ * @return
+ * @throws IOException
+ */
EventIndexSearcher newIndexSearcher(String containerName) throws IOException;
+
+ /**
+ * Executes the given action against each index and returns a Set of results,
+ * where each result is obtained from performing the given action against one index
+ *
+ * @param action the action to perform
+ * @return
+ * @throws IOException
+ */
+ <T> Set<T> withEachIndex(IndexAction<T> action) throws IOException;
+
+ /**
+ * Performs the given action against each index, waiting for the action to complete
+ * against each index before returning
+ *
+ * @param action the action to perform against each index
+ * @throws IOException
+ */
+ void withEachIndex(VoidIndexAction action) throws IOException;
+
+ /**
+ * Performs the given action against each index
+ *
+ * @param action the action to perform
+ *
+ * @param async if true, the method will return immediatley and the actions will occur
+ * in the background. If <code>false</code>, the method will not return until the action
+ * has been performed against all indices
+ *
+ * @throws IOException
+ */
+ void withEachIndex(VoidIndexAction action, boolean async) throws IOException;
+
+ /**
+ * Removes any events that have a Storage Location that includes the provided containerName, secitonIndex, and journalId,
+ * and then re-adds all of the events that are in the given journalFile.
+ * @param containerName
+ * @param sectionIndex
+ * @param journalId
+ * @param journalFile
+ * @throws IOException
+ */
+ void reindex(final String containerName, final int sectionIndex, final Long journalId, final File journalFile) throws IOException;
+
+ /**
+ * Syncs all indices
+ * @throws IOException
+ */
+ void sync() throws IOException;
+
+ /**
+ * Returns the total number of events in all indices
+ * @return
+ */
+ long getNumberOfEvents() throws IOException;
+
+ /**
+ * Removes all events from all indices that occurred before the given time
+ * @param earliestEventTimeToDelete
+ * @throws IOException
+ */
+ void deleteOldEvents(long earliestEventTimeToDelete) throws IOException;
+
+ /**
+ * Deletes any events from the index that belong to the given container, section, and journal id
+ *
+ * @param containerName
+ * @param sectionIndex
+ * @param journalId
+ *
+ * @throws IOException
+ */
+ void deleteEvents(String containerName, int sectionIndex, Long journalId) throws IOException;
+
+ /**
+ * Deletes any events from the index that belong to the given container and section but have
+ * a journal id before the value specified
+ *
+ * @param containerName
+ * @param sectionIndex
+ * @param journalId
+ *
+ * @throws IOException
+ */
+ void deleteEventsBefore(String containerName, int sectionIndex, Long journalId) throws IOException;
+
+ /**
+ * Returns the size (in bytes) of the index for the given container
+ * @param containerName
+ * @return
+ */
+ long getSize(String containerName);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java
index d10fd00..e212342 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java
@@ -20,13 +20,21 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent;
import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
+import org.apache.nifi.provenance.journaling.toc.TocJournalReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,14 +42,14 @@ public class LuceneIndexManager implements IndexManager {
private static final Logger logger = LoggerFactory.getLogger(LuceneIndexManager.class);
private final JournalingRepositoryConfig config;
- private final ScheduledExecutorService executor;
+ private final ExecutorService queryExecutor;
private final Map<String, List<LuceneIndexWriter>> writers = new HashMap<>();
private final Map<String, AtomicLong> writerIndexes = new HashMap<>();
- public LuceneIndexManager(final JournalingRepositoryConfig config, final ScheduledExecutorService executor) throws IOException {
+ public LuceneIndexManager(final JournalingRepositoryConfig config, final ScheduledExecutorService workerExecutor, final ExecutorService queryExecutor) throws IOException {
this.config = config;
- this.executor = executor;
+ this.queryExecutor = queryExecutor;
final int rolloverSeconds = (int) config.getJournalRolloverPeriod(TimeUnit.SECONDS);
if ( !config.isReadOnly() ) {
@@ -58,7 +66,7 @@ public class LuceneIndexManager implements IndexManager {
writerList.add(new LuceneIndexWriter(indexDir, config));
}
- executor.scheduleWithFixedDelay(new Runnable() {
+ workerExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
@@ -150,6 +158,15 @@ public class LuceneIndexManager implements IndexManager {
return max;
}
+ @Override
+ public void sync() throws IOException {
+ for ( final List<LuceneIndexWriter> writerList : writers.values() ) {
+ for ( final LuceneIndexWriter writer : writerList ) {
+ writer.sync();
+ }
+ }
+ }
+
private void sync(final String containerName) throws IOException {
final AtomicLong index = writerIndexes.get(containerName);
@@ -175,4 +192,190 @@ public class LuceneIndexManager implements IndexManager {
}
}
}
+
+ @Override
+ public <T> Set<T> withEachIndex(final IndexAction<T> action) throws IOException {
+ final Set<T> results = new HashSet<>();
+ final Map<String, Future<T>> futures = new HashMap<>();
+ final Set<String> containerNames = config.getContainers().keySet();
+ for (final String containerName : containerNames) {
+ final Callable<T> callable = new Callable<T>() {
+ @Override
+ public T call() throws Exception {
+ try (final EventIndexSearcher searcher = newIndexSearcher(containerName)) {
+ return action.perform(searcher);
+ }
+ }
+ };
+
+ final Future<T> future = queryExecutor.submit(callable);
+ futures.put(containerName, future);
+ }
+
+ for ( final Map.Entry<String, Future<T>> entry : futures.entrySet() ) {
+ try {
+ final T result = entry.getValue().get();
+ results.add(result);
+ } catch (final ExecutionException ee) {
+ final Throwable cause = ee.getCause();
+ if ( cause instanceof IOException ) {
+ throw (IOException) cause;
+ } else {
+ throw new RuntimeException("Failed to query Container " + entry.getKey() + " due to " + cause, cause);
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ return results;
+ }
+
+ @Override
+ public void withEachIndex(final VoidIndexAction action) throws IOException {
+ withEachIndex(action, false);
+ }
+
+ @Override
+ public void withEachIndex(final VoidIndexAction action, final boolean async) throws IOException {
+ final Map<String, Future<?>> futures = new HashMap<>();
+ final Set<String> containerNames = config.getContainers().keySet();
+
+ for (final String containerName : containerNames) {
+ final Callable<Object> callable = new Callable<Object>() {
+ @Override
+ public Object call() throws IOException {
+ try (final EventIndexSearcher searcher = newIndexSearcher(containerName)) {
+ action.perform(searcher);
+ return null;
+ } catch (final Throwable t) {
+ if ( async ) {
+ logger.error("Failed to perform action against container " + containerName + " due to " + t, t);
+ if ( logger.isDebugEnabled() ) {
+ logger.error("", t);
+ }
+
+ return null;
+ } else {
+ throw new IOException("Failed to perform action against container " + containerName + " due to " + t, t);
+ }
+ }
+ }
+ };
+
+ final Future<?> future = queryExecutor.submit(callable);
+ futures.put(containerName, future);
+ }
+
+ if ( !async ) {
+ for ( final Map.Entry<String, Future<?>> entry : futures.entrySet() ) {
+ try {
+ // throw any exception thrown by runnable
+ entry.getValue().get();
+ } catch (final ExecutionException ee) {
+ final Throwable cause = ee.getCause();
+ if ( cause instanceof IOException ) {
+ throw ((IOException) cause);
+ }
+
+ throw new RuntimeException("Failed to query Partition " + entry.getKey() + " due to " + cause, cause);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public int getNumberOfIndices() {
+ return config.getContainers().size();
+ }
+
+ @Override
+ public void deleteEvents(final String containerName, final int sectionIndex, final Long journalId) throws IOException {
+ final List<LuceneIndexWriter> writerList = writers.get(containerName);
+ for ( final LuceneIndexWriter writer : writerList ) {
+ writer.delete(containerName, String.valueOf(sectionIndex), journalId);
+ }
+ }
+
+ @Override
+ public void deleteEventsBefore(final String containerName, final int sectionIndex, final Long journalId) throws IOException {
+ final List<LuceneIndexWriter> writerList = writers.get(containerName);
+ for ( final LuceneIndexWriter writer : writerList ) {
+ writer.deleteEventsBefore(containerName, String.valueOf(sectionIndex), journalId);
+ }
+ }
+
+ @Override
+ public void reindex(final String containerName, final int sectionIndex, final Long journalId, final File journalFile) throws IOException {
+ deleteEvents(containerName, sectionIndex, journalId);
+
+ final LuceneIndexWriter writer = getIndexWriter(containerName);
+ try (final TocJournalReader reader = new TocJournalReader(containerName, String.valueOf(sectionIndex), journalId, journalFile)) {
+ final List<JournaledProvenanceEvent> events = new ArrayList<>(1000);
+ JournaledProvenanceEvent event;
+
+ while ((event = reader.nextJournaledEvent()) != null) {
+ events.add(event);
+ if ( events.size() >= 1000 ) {
+ writer.index(events);
+ events.clear();
+ }
+ }
+
+ if (!events.isEmpty() ) {
+ writer.index(events);
+ }
+ }
+ }
+
+ @Override
+ public long getNumberOfEvents() throws IOException {
+ final AtomicLong totalCount = new AtomicLong(0L);
+ withEachIndex(new VoidIndexAction() {
+ @Override
+ public void perform(final EventIndexSearcher searcher) throws IOException {
+ totalCount.addAndGet(searcher.getNumberOfEvents());
+ }
+ });
+
+ return totalCount.get();
+ }
+
+ @Override
+ public void deleteOldEvents(final long earliestEventTimeToDelete) throws IOException {
+ for ( final String containerName : config.getContainers().keySet() ) {
+ final List<LuceneIndexWriter> writerList = writers.get(containerName);
+ for ( final LuceneIndexWriter writer : writerList ) {
+ writer.deleteOldEvents(earliestEventTimeToDelete);
+ }
+ }
+ }
+
+
+ @Override
+ public long getSize(final String containerName) {
+ final File containerFile = config.getContainers().get(containerName);
+ final File indicesDir = new File(containerFile, "indices");
+
+ return getSize(indicesDir);
+ }
+
+ private long getSize(final File file) {
+ if ( file.isDirectory() ) {
+ long totalSize = 0L;
+
+ final File[] children = file.listFiles();
+ if ( children != null ) {
+ for ( final File child : children ) {
+ totalSize += getSize(child);
+ }
+ }
+
+ return totalSize;
+ } else {
+ return file.length();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
index a9dd1a5..94bd3f8 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
@@ -19,6 +19,7 @@ package org.apache.nifi.provenance.journaling.index;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import org.apache.lucene.document.Document;
@@ -27,6 +28,7 @@ import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.NumericRangeQuery;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
@@ -34,7 +36,9 @@ import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortField.Type;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.search.TopFieldDocs;
import org.apache.lucene.store.FSDirectory;
+import org.apache.nifi.provenance.SearchableFields;
import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
import org.apache.nifi.provenance.search.Query;
@@ -43,16 +47,20 @@ public class LuceneIndexSearcher implements EventIndexSearcher {
private final IndexSearcher searcher;
private final FSDirectory fsDirectory;
+ private final String description;
+
public LuceneIndexSearcher(final File indexDirectory) throws IOException {
this.fsDirectory = FSDirectory.open(indexDirectory);
this.reader = DirectoryReader.open(fsDirectory);
this.searcher = new IndexSearcher(reader);
+ this.description = "LuceneIndexSearcher[indexDirectory=" + indexDirectory + "]";
}
- public LuceneIndexSearcher(final DirectoryReader reader) {
+ public LuceneIndexSearcher(final DirectoryReader reader, final File indexDirectory) {
this.reader = reader;
this.searcher = new IndexSearcher(reader);
this.fsDirectory = null;
+ this.description = "LuceneIndexSearcher[indexDirectory=" + indexDirectory + "]";
}
@Override
@@ -76,29 +84,35 @@ public class LuceneIndexSearcher implements EventIndexSearcher {
private JournaledStorageLocation createLocation(final Document document) {
final String containerName = document.get(IndexedFieldNames.CONTAINER_NAME);
final String sectionName = document.get(IndexedFieldNames.SECTION_NAME);
- final String journalId = document.get(IndexedFieldNames.JOURNAL_ID);
+ final long journalId = document.getField(IndexedFieldNames.JOURNAL_ID).numericValue().longValue();
final int blockIndex = document.getField(IndexedFieldNames.BLOCK_INDEX).numericValue().intValue();
final long eventId = document.getField(IndexedFieldNames.EVENT_ID).numericValue().longValue();
return new JournaledStorageLocation(containerName, sectionName, journalId, blockIndex, eventId);
}
- private List<JournaledStorageLocation> getLocations(final TopDocs topDocs) throws IOException {
+ private List<JournaledStorageLocation> getOrderedLocations(final TopDocs topDocs) throws IOException {
final ScoreDoc[] scoreDocs = topDocs.scoreDocs;
final List<JournaledStorageLocation> locations = new ArrayList<>(scoreDocs.length);
- for ( final ScoreDoc scoreDoc : scoreDocs ) {
+ populateLocations(topDocs, locations);
+
+ return locations;
+ }
+
+
+ private void populateLocations(final TopDocs topDocs, final Collection<JournaledStorageLocation> locations) throws IOException {
+ for ( final ScoreDoc scoreDoc : topDocs.scoreDocs ) {
final Document document = reader.document(scoreDoc.doc);
locations.add(createLocation(document));
}
-
- return locations;
}
+
@Override
public SearchResult search(final Query provenanceQuery) throws IOException {
final org.apache.lucene.search.Query luceneQuery = QueryUtils.convertQueryToLucene(provenanceQuery);
final TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults());
- final List<JournaledStorageLocation> locations = getLocations(topDocs);
+ final List<JournaledStorageLocation> locations = getOrderedLocations(topDocs);
return new SearchResult(locations, topDocs.totalHits);
}
@@ -109,7 +123,7 @@ public class LuceneIndexSearcher implements EventIndexSearcher {
query.add(NumericRangeQuery.newLongRange(IndexedFieldNames.EVENT_ID, minEventId, null, true, true), Occur.MUST);
final TopDocs topDocs = searcher.search(query, maxResults, new Sort(new SortField(IndexedFieldNames.EVENT_ID, Type.LONG)));
- return getLocations(topDocs);
+ return getOrderedLocations(topDocs);
}
@Override
@@ -125,7 +139,7 @@ public class LuceneIndexSearcher implements EventIndexSearcher {
}
final TopDocs topDocs = searcher.search(query, 1, new Sort(new SortField(IndexedFieldNames.EVENT_ID, Type.LONG, true)));
- final List<JournaledStorageLocation> locations = getLocations(topDocs);
+ final List<JournaledStorageLocation> locations = getOrderedLocations(topDocs);
if ( locations.isEmpty() ) {
return null;
}
@@ -133,4 +147,45 @@ public class LuceneIndexSearcher implements EventIndexSearcher {
return locations.get(0).getEventId();
}
+ @Override
+ public List<JournaledStorageLocation> getEventsForFlowFiles(final Collection<String> flowFileUuids, final long earliestTime, final long latestTime) throws IOException {
+ // Create a query for all Events related to the FlowFiles of interest. We do this by adding all ID's as
+ // "SHOULD" clauses and then setting the minimum required to 1.
+ final BooleanQuery flowFileIdQuery;
+ if (flowFileUuids == null || flowFileUuids.isEmpty()) {
+ flowFileIdQuery = null;
+ } else {
+ flowFileIdQuery = new BooleanQuery();
+ for (final String flowFileUuid : flowFileUuids) {
+ flowFileIdQuery.add(new TermQuery(new Term(SearchableFields.FlowFileUUID.getSearchableFieldName(), flowFileUuid)), Occur.SHOULD);
+ }
+ flowFileIdQuery.setMinimumNumberShouldMatch(1);
+ }
+
+ flowFileIdQuery.add(NumericRangeQuery.newLongRange(SearchableFields.EventTime.getSearchableFieldName(),
+ earliestTime, latestTime, true, true), Occur.MUST);
+
+ final TopDocs topDocs = searcher.search(flowFileIdQuery, 1000);
+ return getOrderedLocations(topDocs);
+ }
+
+
+ @Override
+ public List<JournaledStorageLocation> getLatestEvents(final int numEvents) throws IOException {
+ final MatchAllDocsQuery query = new MatchAllDocsQuery();
+
+ final TopFieldDocs topDocs = searcher.search(query, numEvents, new Sort(new SortField(IndexedFieldNames.EVENT_ID, Type.LONG, true)));
+ final List<JournaledStorageLocation> locations = getOrderedLocations(topDocs);
+ return locations;
+ }
+
+ @Override
+ public String toString() {
+ return description;
+ }
+
+ @Override
+ public long getNumberOfEvents() {
+ return reader.numDocs();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java
index b61ad34..d1b2c0e 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java
@@ -21,10 +21,12 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.analysis.Analyzer;
@@ -33,6 +35,7 @@ import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.LongField;
import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
@@ -40,6 +43,8 @@ import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.NumericRangeQuery;
+import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
@@ -61,6 +66,7 @@ public class LuceneIndexWriter implements EventIndexWriter {
private final JournalingRepositoryConfig config;
private final Set<SearchableField> nonAttributeSearchableFields;
private final Set<SearchableField> attributeSearchableFields;
+ private final File indexDir;
private final Directory directory;
private final Analyzer analyzer;
@@ -68,6 +74,7 @@ public class LuceneIndexWriter implements EventIndexWriter {
private final AtomicLong indexMaxId = new AtomicLong(-1L);
public LuceneIndexWriter(final File indexDir, final JournalingRepositoryConfig config) throws IOException {
+ this.indexDir = indexDir;
this.config = config;
attributeSearchableFields = Collections.unmodifiableSet(new HashSet<>(config.getSearchableAttributes()));
@@ -76,12 +83,20 @@ public class LuceneIndexWriter implements EventIndexWriter {
directory = FSDirectory.open(indexDir);
analyzer = new StandardAnalyzer();
final IndexWriterConfig writerConfig = new IndexWriterConfig(Version.LATEST, analyzer);
+ // Increase number of concurrent merges since we are on SSD:
+ final ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
+ writerConfig.setMergeScheduler(cms);
+ final int mergeThreads = Math.max(2, Math.min(4, config.getWorkerThreadPoolSize() / 2));
+ cms.setMaxMergesAndThreads(mergeThreads, mergeThreads);
+
indexWriter = new IndexWriter(directory, writerConfig);
}
public EventIndexSearcher newIndexSearcher() throws IOException {
+ logger.trace("Creating index searcher for {}", indexWriter);
+
final DirectoryReader reader = DirectoryReader.open(indexWriter, false);
- return new LuceneIndexSearcher(reader);
+ return new LuceneIndexSearcher(reader, indexDir);
}
@Override
@@ -119,6 +134,8 @@ public class LuceneIndexWriter implements EventIndexWriter {
public void index(final Collection<JournaledProvenanceEvent> events) throws IOException {
long maxId = this.indexMaxId.get();
+ final long startNanos = System.nanoTime();
+
final List<Document> documents = new ArrayList<>(events.size());
for ( final JournaledProvenanceEvent event : events ) {
maxId = event.getEventId();
@@ -154,7 +171,7 @@ public class LuceneIndexWriter implements EventIndexWriter {
final JournaledStorageLocation location = event.getStorageLocation();
doc.add(new StringField(IndexedFieldNames.CONTAINER_NAME, location.getContainerName(), Store.YES));
doc.add(new StringField(IndexedFieldNames.SECTION_NAME, location.getSectionName(), Store.YES));
- doc.add(new StringField(IndexedFieldNames.JOURNAL_ID, location.getJournalId(), Store.YES));
+ doc.add(new LongField(IndexedFieldNames.JOURNAL_ID, location.getJournalId(), Store.YES));
doc.add(new LongField(IndexedFieldNames.BLOCK_INDEX, location.getBlockIndex(), Store.YES));
doc.add(new LongField(IndexedFieldNames.EVENT_ID, location.getEventId(), Store.YES));
@@ -207,22 +224,59 @@ public class LuceneIndexWriter implements EventIndexWriter {
updated = true;
}
} while (!updated);
+
+ final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ logger.debug("Indexed {} events in {} millis with {}", events.size(), millis, this);
}
@Override
- public void delete(final String containerName, final String section, final String journalId) throws IOException {
+ public void delete(final String containerName, final String section, final Long journalId) throws IOException {
+ final BooleanQuery query = new BooleanQuery();
+ query.add(new BooleanClause(new TermQuery(new Term(IndexedFieldNames.CONTAINER_NAME, containerName)), Occur.MUST));
+ query.add(new BooleanClause(new TermQuery(new Term(IndexedFieldNames.SECTION_NAME, section)), Occur.MUST));
+ query.add(NumericRangeQuery.newLongRange(IndexedFieldNames.JOURNAL_ID, journalId, journalId, true, true), Occur.MUST);
+
+ final long start = System.nanoTime();
+ indexWriter.deleteDocuments(query);
+ final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ logger.info("Deleted events from {} that matched container={}, section={}, journal={} in {} millis", indexWriter, containerName, section, journalId, millis);
+ }
+
+ @Override
+ public void deleteEventsBefore(final String containerName, final String section, final Long journalId) throws IOException {
final BooleanQuery query = new BooleanQuery();
query.add(new BooleanClause(new TermQuery(new Term(IndexedFieldNames.CONTAINER_NAME, containerName)), Occur.MUST));
query.add(new BooleanClause(new TermQuery(new Term(IndexedFieldNames.SECTION_NAME, section)), Occur.MUST));
- query.add(new BooleanClause(new TermQuery(new Term(IndexedFieldNames.JOURNAL_ID, journalId)), Occur.MUST));
+ query.add(NumericRangeQuery.newLongRange(IndexedFieldNames.JOURNAL_ID, 0L, journalId, true, false), Occur.MUST);
+ final long start = System.nanoTime();
indexWriter.deleteDocuments(query);
+ final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ logger.info("Deleted events from {} that matched container={}, section={}, journal less than {} in {} millis", indexWriter, containerName, section, journalId, millis);
}
@Override
+ public void deleteOldEvents(final long earliestEventTimeToDelete) throws IOException {
+ final Query query = NumericRangeQuery.newLongRange(SearchableFields.EventTime.getSearchableFieldName(), 0L, earliestEventTimeToDelete, true, true);
+
+ final long start = System.nanoTime();
+ indexWriter.deleteDocuments(query);
+ final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ logger.info("Deleted events from {} that ocurred before {}; deletion took {} millis", this, new Date(earliestEventTimeToDelete), millis);
+ }
+
+ @Override
public void sync() throws IOException {
+ final long start = System.nanoTime();
indexWriter.commit();
+ final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ logger.info("Successfully sync'ed {} in {} millis", this, millis);
+ }
+
+ @Override
+ public String toString() {
+ return "LuceneIndexWriter[indexDir=" + indexDir + "]";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java
index d086ff5..4accf50 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java
@@ -18,7 +18,9 @@ package org.apache.nifi.provenance.journaling.index;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
@@ -63,6 +65,7 @@ public class MultiIndexSearcher implements EventIndexSearcher {
locations.addAll(result.getLocations());
}
+ Collections.sort(locations);
return new SearchResult(locations, totalHitCount);
}
@@ -109,4 +112,52 @@ public class MultiIndexSearcher implements EventIndexSearcher {
return max;
}
+ @Override
+ public List<JournaledStorageLocation> getEventsForFlowFiles(final Collection<String> flowFileUuids, final long earliestTime, final long latestTime) throws IOException {
+ final List<JournaledStorageLocation> locations = new ArrayList<>();
+ for ( final EventIndexSearcher searcher : searchers ) {
+ final List<JournaledStorageLocation> indexLocations = searcher.getEventsForFlowFiles(flowFileUuids, earliestTime, latestTime);
+ if ( indexLocations != null && !indexLocations.isEmpty() ) {
+ locations.addAll(indexLocations);
+ }
+ }
+
+ Collections.sort(locations);
+ return locations;
+ }
+
+ @Override
+ public List<JournaledStorageLocation> getLatestEvents(final int numEvents) throws IOException {
+ final List<JournaledStorageLocation> locations = new ArrayList<>();
+ for ( final EventIndexSearcher searcher : searchers ) {
+ final List<JournaledStorageLocation> indexLocations = searcher.getLatestEvents(numEvents);
+ if ( indexLocations != null && !indexLocations.isEmpty() ) {
+ locations.addAll(indexLocations);
+ }
+ }
+
+ Collections.sort(locations, new Comparator<JournaledStorageLocation>() {
+ @Override
+ public int compare(final JournaledStorageLocation o1, final JournaledStorageLocation o2) {
+ return Long.compare(o1.getEventId(), o2.getEventId());
+ }
+ });
+ return locations;
+ }
+
+ @Override
+ public long getNumberOfEvents() throws IOException {
+ long totalCount = 0;
+
+ for ( final EventIndexSearcher searcher : searchers ) {
+ totalCount += searcher.getNumberOfEvents();
+ }
+
+ return totalCount;
+ }
+
+ @Override
+ public String toString() {
+ return searchers.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/QueryUtils.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/QueryUtils.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/QueryUtils.java
index 4ae4b16..d8dd5eb 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/QueryUtils.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/QueryUtils.java
@@ -91,6 +91,15 @@ public class QueryUtils {
}
+ /**
+ * Orders the given StorageLocations so that we have a Map where the Key is a Journal file and the value is a List of JournaledStorageLocation in the order
+ * in which they should be read from the journal for optimal performance
+ * @param locations
+ * @param config
+ * @return
+ * @throws FileNotFoundException
+ * @throws IOException
+ */
public static Map<File, List<JournaledStorageLocation>> orderLocations(final List<StorageLocation> locations, final JournalingRepositoryConfig config) throws FileNotFoundException, IOException {
final Map<File, List<JournaledStorageLocation>> map = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/VoidIndexAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/VoidIndexAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/VoidIndexAction.java
new file mode 100644
index 0000000..a1ae22f
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/VoidIndexAction.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.index;
+
+import java.io.IOException;
+
+public interface VoidIndexAction {
+ void perform(EventIndexSearcher searcher) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/JournalReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/JournalReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/JournalReader.java
index 535d1dd..3345f50 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/JournalReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/JournalReader.java
@@ -44,4 +44,12 @@ public interface JournalReader extends Closeable {
* @return
*/
long getPosition();
+
+ /**
+ * Retrieves the last event from the journal, given the offset of the last Block in the journal
+ * @param blockOffset
+ * @return
+ * @throws IOException
+ */
+ ProvenanceEventRecord getLastEvent(long blockOffset) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
index 2ec5131..df4a2d0 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
@@ -62,13 +62,21 @@ public class StandardJournalReader implements JournalReader {
private void resetStreams() throws IOException {
final InputStream bufferedIn = new BufferedInputStream(new FileInputStream(file));
compressedStream = new ByteCountingInputStream(bufferedIn);
- final DataInputStream dis = new DataInputStream(compressedStream);
- final String codecName = dis.readUTF();
- serializationVersion = dis.readInt();
- compressed = dis.readBoolean();
- deserializer = Deserializers.getDeserializer(codecName);
-
- resetDecompressedStream();
+ try {
+ final DataInputStream dis = new DataInputStream(compressedStream);
+ final String codecName = dis.readUTF();
+ serializationVersion = dis.readInt();
+ compressed = dis.readBoolean();
+ deserializer = Deserializers.getDeserializer(codecName);
+
+ resetDecompressedStream();
+ } catch (final Exception e) {
+ try {
+ compressedStream.close();
+ } catch (final IOException ignore) {}
+
+ throw new IOException("Failed to reset data stream when reading" + file, e);
+ }
}
private void resetDecompressedStream() throws IOException {
@@ -80,6 +88,15 @@ public class StandardJournalReader implements JournalReader {
}
@Override
+ public void close() throws IOException {
+ compressedStream.close();
+
+ if ( compressedStream != decompressedStream ) {
+ decompressedStream.close();
+ }
+ }
+
+ @Override
public ProvenanceEventRecord nextEvent() throws IOException {
return nextEvent(true);
}
@@ -145,7 +162,7 @@ public class StandardJournalReader implements JournalReader {
// of the file. We do this because we know that the ID's are always increasing, so if we need an ID less
// than the previous ID, we have to go backward in the file. We can't do this with streams, so start the
// stream over.
- if ( eventId < lastEventIdRead ) {
+ if ( eventId <= lastEventIdRead ) {
close();
resetStreams();
}
@@ -167,12 +184,36 @@ public class StandardJournalReader implements JournalReader {
}
@Override
- public void close() throws IOException {
- decompressedStream.close();
- }
-
- @Override
public String toString() {
return "StandardJournalReader[" + file + "]";
}
+
+ @Override
+ public ProvenanceEventRecord getLastEvent(final long blockOffset) throws IOException {
+ if ( blockOffset > compressedStream.getBytesConsumed() ) {
+ close();
+ resetStreams();
+ }
+
+ final long bytesToSkip = blockOffset - compressedStream.getBytesConsumed();
+ if ( bytesToSkip > 0 ) {
+ StreamUtils.skip(compressedStream, bytesToSkip);
+ resetDecompressedStream();
+ }
+
+ ProvenanceEventRecord lastReadRecord = null;
+ ProvenanceEventRecord event;
+ while ((event = nextEvent()) != null) {
+ lastReadRecord = event;
+ }
+
+ // If we weren't able to read anything and the block offset was given, just start over
+ // and read the entire thing, returning the last event.
+ if ( lastReadRecord == null && blockOffset > 0L ) {
+ return getLastEvent(0L);
+ }
+
+ // return the last even that we read, whether or not it was null
+ return lastReadRecord;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
index af5f8de..8d322b9 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
@@ -16,11 +16,17 @@
*/
package org.apache.nifi.provenance.journaling.journals;
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.EOFException;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.file.FileAlreadyExistsException;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
@@ -30,6 +36,8 @@ import org.apache.nifi.remote.io.CompressionOutputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -82,11 +90,14 @@ import org.apache.nifi.stream.io.ByteCountingOutputStream;
*
*/
public class StandardJournalWriter implements JournalWriter {
+ private static final Logger logger = LoggerFactory.getLogger(StandardJournalWriter.class);
+
private final long journalId;
private final File journalFile;
private final boolean compressed;
private final Serializer serializer;
private final long creationTime = System.nanoTime();
+ private final String description;
private int eventCount;
private boolean blockStarted = false;
@@ -101,10 +112,28 @@ public class StandardJournalWriter implements JournalWriter {
public StandardJournalWriter(final long journalId, final File journalFile, final boolean compressed, final Serializer serializer) throws IOException {
+ if ( journalFile.exists() ) {
+ // Check if there is actually any data here.
+ try (final InputStream fis = new FileInputStream(journalFile);
+ final InputStream bufferedIn = new BufferedInputStream(fis);
+ final DataInputStream dis = new DataInputStream(bufferedIn) ) {
+ dis.readUTF();
+ dis.readInt();
+ dis.readBoolean();
+ final int nextByte = dis.read();
+ if ( nextByte > -1 ) {
+ throw new FileAlreadyExistsException(journalFile.getAbsolutePath());
+ }
+ } catch (final EOFException eof) {
+ // If we catch an EOF, there's no real data here, so we can overwrite the file.
+ }
+ }
+
this.journalId = journalId;
this.journalFile = journalFile;
this.compressed = compressed;
this.serializer = serializer;
+ this.description = "Journal Writer for " + journalFile;
this.fos = new FileOutputStream(journalFile);
uncompressedStream = new ByteCountingOutputStream(fos);
@@ -164,6 +193,7 @@ public class StandardJournalWriter implements JournalWriter {
@Override
public void write(final Collection<ProvenanceEventRecord> events, final long firstEventId) throws IOException {
+ final long start = System.nanoTime();
final int avgRecordSize = (int) (recordBytes / recordCount);
final ByteArrayOutputStream baos = new ByteArrayOutputStream(avgRecordSize);
@@ -192,6 +222,9 @@ public class StandardJournalWriter implements JournalWriter {
} finally {
outDos.flush();
}
+
+ final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ logger.debug("Finished writing {} events to {} in {} millis", events.size(), this, millis);
}
@@ -202,7 +235,10 @@ public class StandardJournalWriter implements JournalWriter {
@Override
public void sync() throws IOException {
+ final long start = System.nanoTime();
fos.getFD().sync();
+ final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()- start);
+ logger.debug("Successfully sync'ed {} in {} millis", this, millis);
}
@Override
@@ -259,6 +295,6 @@ public class StandardJournalWriter implements JournalWriter {
@Override
public String toString() {
- return "Journal Writer for " + journalFile;
+ return description;
}
}