You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/07/30 15:55:24 UTC
nifi git commit: NIFI-793: Added multi-threading to the indexing in
the Persistent Provenance Repository
Repository: nifi
Updated Branches:
refs/heads/NIFI-793 [created] f260ec760
NIFI-793: Added multi-threading to the indexing in the Persistent Provenance Repository
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f260ec76
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f260ec76
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f260ec76
Branch: refs/heads/NIFI-793
Commit: f260ec760241f8aeaaa96e97e40282e603f6cea2
Parents: 75ed16c
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Jul 29 14:28:10 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Jul 30 09:54:50 2015 -0400
----------------------------------------------------------------------
nifi/nifi-assembly/pom.xml | 1 +
.../org/apache/nifi/util/NiFiProperties.java | 1 +
.../src/main/resources/conf/nifi.properties | 1 +
.../PersistentProvenanceRepository.java | 146 +++++++++++++++----
.../provenance/RepositoryConfiguration.java | 34 +++--
5 files changed, 143 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/f260ec76/nifi/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-assembly/pom.xml b/nifi/nifi-assembly/pom.xml
index 60a6545..34d6c25 100644
--- a/nifi/nifi-assembly/pom.xml
+++ b/nifi/nifi-assembly/pom.xml
@@ -280,6 +280,7 @@ language governing permissions and limitations under the License. -->
<nifi.provenance.repository.rollover.time>30 secs</nifi.provenance.repository.rollover.time>
<nifi.provenance.repository.rollover.size>100 MB</nifi.provenance.repository.rollover.size>
<nifi.provenance.repository.query.threads>2</nifi.provenance.repository.query.threads>
+ <nifi.provenance.repository.index.threads>1</nifi.provenance.repository.index.threads>
<nifi.provenance.repository.compress.on.rollover>true</nifi.provenance.repository.compress.on.rollover>
<nifi.provenance.repository.indexed.fields>EventType, FlowFileUUID, Filename, ProcessorID, Relationship</nifi.provenance.repository.indexed.fields>
<nifi.provenance.repository.indexed.attributes />
http://git-wip-us.apache.org/repos/asf/nifi/blob/f260ec76/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index e25f5d6..520e0ba 100644
--- a/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ b/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -99,6 +99,7 @@ public class NiFiProperties extends Properties {
public static final String PROVENANCE_ROLLOVER_TIME = "nifi.provenance.repository.rollover.time";
public static final String PROVENANCE_ROLLOVER_SIZE = "nifi.provenance.repository.rollover.size";
public static final String PROVENANCE_QUERY_THREAD_POOL_SIZE = "nifi.provenance.repository.query.threads";
+ public static final String PROVENANCE_INDEX_THREAD_POOL_SIZE = "nifi.provenance.repository.index.threads";
public static final String PROVENANCE_COMPRESS_ON_ROLLOVER = "nifi.provenance.repository.compress.on.rollover";
public static final String PROVENANCE_INDEXED_FIELDS = "nifi.provenance.repository.indexed.fields";
public static final String PROVENANCE_INDEXED_ATTRIBUTES = "nifi.provenance.repository.indexed.attributes";
http://git-wip-us.apache.org/repos/asf/nifi/blob/f260ec76/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index 4043076..63e5391 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -71,6 +71,7 @@ nifi.provenance.repository.max.storage.size=${nifi.provenance.repository.max.sto
nifi.provenance.repository.rollover.time=${nifi.provenance.repository.rollover.time}
nifi.provenance.repository.rollover.size=${nifi.provenance.repository.rollover.size}
nifi.provenance.repository.query.threads=${nifi.provenance.repository.query.threads}
+nifi.provenance.repository.index.threads=${nifi.provenance.repository.index.threads}
nifi.provenance.repository.compress.on.rollover=${nifi.provenance.repository.compress.on.rollover}
nifi.provenance.repository.always.sync=${nifi.provenance.repository.always.sync}
nifi.provenance.repository.journal.count=${nifi.provenance.repository.journal.count}
http://git-wip-us.apache.org/repos/asf/nifi/blob/f260ec76/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 81d883a..4408e3d 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -38,6 +38,7 @@ import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -45,6 +46,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -95,6 +97,7 @@ import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.RingBuffer;
import org.apache.nifi.util.RingBuffer.ForEachEvaluator;
import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -278,6 +281,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
final String rolloverSize = properties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_SIZE, "100 MB");
final String shardSize = properties.getProperty(NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE, "500 MB");
final int queryThreads = properties.getIntegerProperty(NiFiProperties.PROVENANCE_QUERY_THREAD_POOL_SIZE, 2);
+ final int indexThreads = properties.getIntegerProperty(
+ NiFiProperties.PROVENANCE_INDEX_THREAD_POOL_SIZE, 1);
final int journalCount = properties.getIntegerProperty(NiFiProperties.PROVENANCE_JOURNAL_COUNT, 16);
final long storageMillis = FormatUtils.getTimeDuration(storageTime, TimeUnit.MILLISECONDS);
@@ -326,6 +331,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
config.setMaxRecordLife(storageMillis, TimeUnit.MILLISECONDS);
config.setMaxStorageCapacity(maxStorageBytes);
config.setQueryThreadPoolSize(queryThreads);
+ config.setIndexThreadPoolSize(indexThreads);
config.setJournalCount(journalCount);
config.setMaxAttributeChars(maxAttrChars);
@@ -795,7 +801,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
*
* @throws IOException if unable to purge old events due to an I/O problem
*/
- void purgeOldEvents() throws IOException {
+ synchronized void purgeOldEvents() throws IOException {
while (!recoveryFinished.get()) {
try {
Thread.sleep(100L);
@@ -1009,6 +1015,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
}
if (fileRolledOver == null) {
+ logger.debug("Couldn't merge journals. Will try again in 10 seconds. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir);
return;
}
final File file = fileRolledOver;
@@ -1063,19 +1070,31 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
if (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) {
logger.warn("The rate of the dataflow is exceeding the provenance recording rate. "
- + "Slowing down flow to accomodate. Currently, there are {} journal files ({} bytes) and "
+ + "Slowing down flow to accommodate. Currently, there are {} journal files ({} bytes) and "
+ "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold);
eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The rate of the dataflow is "
- + "exceeding the provenance recording rate. Slowing down flow to accomodate");
+ + "exceeding the provenance recording rate. Slowing down flow to accommodate");
while (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) {
- try {
- Thread.sleep(1000L);
- } catch (final InterruptedException ie) {
+ if (repoSize > sizeThreshold) {
+ logger.debug("Provenance Repository has exceeded its size threshold; will trigger purging of oldest events");
+ purgeOldEvents();
+
+ journalFileCount = getJournalCount();
+ repoSize = getSize(getLogFiles(), 0L);
+ continue;
+ } else {
+ // if we are constrained by the number of journal files rather than the size of the repo,
+ // then we will just sleep a bit because another thread is already actively merging the journals,
+ // due to the runnable that we scheduled above
+ try {
+ Thread.sleep(100L);
+ } catch (final InterruptedException ie) {
+ }
}
logger.debug("Provenance Repository is still behind. Keeping flow slowed down "
- + "to accomodate. Currently, there are {} journal files ({} bytes) and "
+ + "to accommodate. Currently, there are {} journal files ({} bytes) and "
+ "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold);
journalFileCount = getJournalCount();
@@ -1169,6 +1188,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
}
if (journalFiles.isEmpty()) {
+ logger.debug("Couldn't merge journals: Journal Files is empty; won't merge journals");
return null;
}
@@ -1328,45 +1348,110 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
final IndexingAction indexingAction = new IndexingAction(this);
final File indexingDirectory = indexConfig.getWritableIndexDirectory(writerFile, earliestTimestamp);
+ long maxId = 0L;
+
+ final BlockingQueue<Tuple<StandardProvenanceEventRecord, Integer>> eventQueue = new LinkedBlockingQueue<>(100);
+ final AtomicBoolean finishedAdding = new AtomicBoolean(false);
+ final List<Future<?>> futures = new ArrayList<>();
+
final IndexWriter indexWriter = indexManager.borrowIndexWriter(indexingDirectory);
try {
- long maxId = 0L;
+ final ExecutorService exec = Executors.newFixedThreadPool(configuration.getIndexThreadPoolSize(), new ThreadFactory() {
+ @Override
+ public Thread newThread(final Runnable r) {
+ final Thread t = Executors.defaultThreadFactory().newThread(r);
+ t.setName("Index Provenance Events");
+ return t;
+ }
+ });
- while (!recordToReaderMap.isEmpty()) {
- final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
- final StandardProvenanceEventRecord record = entry.getKey();
- final RecordReader reader = entry.getValue();
+ try {
+ for (int i = 0; i < 6; i++) {
+ final Callable<Object> callable = new Callable<Object>() {
+ @Override
+ public Object call() throws IOException {
+ while (!eventQueue.isEmpty() || !finishedAdding.get()) {
+ final Tuple<StandardProvenanceEventRecord, Integer> tuple;
+ try {
+ tuple = eventQueue.poll(10, TimeUnit.MILLISECONDS);
+ } catch (final InterruptedException ie) {
+ continue;
+ }
+
+ if (tuple == null) {
+ continue;
+ }
+
+ indexingAction.index(tuple.getKey(), indexWriter, tuple.getValue());
+ }
- writer.writeRecord(record, record.getEventId());
- final int blockIndex = writer.getTocWriter().getCurrentBlockIndex();
+ return null;
+ }
+ };
- indexingAction.index(record, indexWriter, blockIndex);
- maxId = record.getEventId();
+ final Future<?> future = exec.submit(callable);
+ futures.add(future);
+ }
- latestRecords.add(truncateAttributes(record));
- records++;
+ while (!recordToReaderMap.isEmpty()) {
+ final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
+ final StandardProvenanceEventRecord record = entry.getKey();
+ final RecordReader reader = entry.getValue();
- // Remove this entry from the map
- recordToReaderMap.remove(record);
+ writer.writeRecord(record, record.getEventId());
+ final int blockIndex = writer.getTocWriter().getCurrentBlockIndex();
- // Get the next entry from this reader and add it to the map
- StandardProvenanceEventRecord nextRecord = null;
+ boolean accepted = false;
+ while (!accepted) {
+ try {
+ accepted = eventQueue.offer(new Tuple<>(record, blockIndex), 10, TimeUnit.MILLISECONDS);
+ } catch (final InterruptedException ie) {
+ }
+ }
+ maxId = record.getEventId();
- try {
- nextRecord = reader.nextRecord();
- } catch (final EOFException eof) {
- }
+ latestRecords.add(truncateAttributes(record));
+ records++;
+
+ // Remove this entry from the map
+ recordToReaderMap.remove(record);
- if (nextRecord != null) {
- recordToReaderMap.put(nextRecord, reader);
+ // Get the next entry from this reader and add it to the map
+ StandardProvenanceEventRecord nextRecord = null;
+
+ try {
+ nextRecord = reader.nextRecord();
+ } catch (final EOFException eof) {
+ }
+
+ if (nextRecord != null) {
+ recordToReaderMap.put(nextRecord, reader);
+ }
}
+ } finally {
+ finishedAdding.set(true);
+ exec.shutdown();
}
- indexWriter.commit();
- indexConfig.setMaxIdIndexed(maxId);
+ for (final Future<?> future : futures) {
+ try {
+ future.get();
+ } catch (final ExecutionException ee) {
+ final Throwable t = ee.getCause();
+ if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ }
+
+ throw new RuntimeException(t);
+ } catch (final InterruptedException e) {
+ throw new RuntimeException("Thread interrupted");
+ }
+ }
} finally {
indexManager.returnIndexWriter(indexingDirectory, indexWriter);
}
+
+ indexConfig.setMaxIdIndexed(maxId);
}
// record should now be available in the repository. We can copy the values from latestRecords to ringBuffer.
@@ -1402,6 +1487,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
if (records == 0) {
writerFile.delete();
+ logger.debug("Couldn't merge journals: No Records to merge");
return null;
} else {
final long nanos = System.nanoTime() - startNanos;
http://git-wip-us.apache.org/repos/asf/nifi/blob/f260ec76/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
index 381d778..e63133a 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
@@ -16,14 +16,14 @@
*/
package org.apache.nifi.provenance;
-import org.apache.nifi.provenance.search.SearchableField;
-
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import org.apache.nifi.provenance.search.SearchableField;
+
public class RepositoryConfiguration {
private final List<File> storageDirectories = new ArrayList<>();
@@ -40,7 +40,8 @@ public class RepositoryConfiguration {
private List<SearchableField> searchableAttributes = new ArrayList<>();
private boolean compress = true;
private boolean alwaysSync = false;
- private int queryThreadPoolSize = 1;
+ private int queryThreadPoolSize = 2;
+ private int indexThreadPoolSize = 1;
private boolean allowRollover = true;
public void setAllowRollover(final boolean allow) {
@@ -204,6 +205,20 @@ public class RepositoryConfiguration {
}
/**
+ * @return the number of threads to use to index provenance events
+ */
+ public int getIndexThreadPoolSize() {
+ return indexThreadPoolSize;
+ }
+
+ public void setIndexThreadPoolSize(final int indexThreadPoolSize) {
+ if (indexThreadPoolSize < 1) {
+ throw new IllegalArgumentException();
+ }
+ this.indexThreadPoolSize = indexThreadPoolSize;
+ }
+
+ /**
* <p>
* Specifies the desired size of each Provenance Event index shard, in
* bytes. We shard the index for a few reasons:
@@ -213,22 +228,21 @@ public class RepositoryConfiguration {
* <li>
* A very large index requires a significant amount of Java heap space to
* search. As the size of the shard increases, the required Java heap space
- * also increases.
- * </li>
+ * also increases.</li>
* <li>
* By having multiple shards, we have the ability to use multiple concurrent
* threads to search the individual shards, resulting in far less latency
- * when performing a search across millions or billions of records.
- * </li>
+ * when performing a search across millions or billions of records.</li>
* <li>
* We keep track of which time ranges each index shard spans. As a result,
* we are able to determine which shards need to be searched if a search
* provides a date range. This can greatly increase the speed of a search
- * and reduce resource utilization.
- * </li>
+ * and reduce resource utilization.</li>
* </ol>
*
- * @param bytes the number of bytes to write to an index before beginning a new shard
+ * @param bytes
+ * the number of bytes to write to an index before beginning a
+ * new shard
*/
public void setDesiredIndexSize(final long bytes) {
this.desiredIndexBytes = bytes;