You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/07/30 15:55:24 UTC

nifi git commit: NIFI-793: Added multi-threading to the indexing in the Persistent Provenance Repository

Repository: nifi
Updated Branches:
  refs/heads/NIFI-793 [created] f260ec760


NIFI-793: Added multi-threading to the indexing in the Persistent Provenance Repository


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f260ec76
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f260ec76
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f260ec76

Branch: refs/heads/NIFI-793
Commit: f260ec760241f8aeaaa96e97e40282e603f6cea2
Parents: 75ed16c
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Jul 29 14:28:10 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Jul 30 09:54:50 2015 -0400

----------------------------------------------------------------------
 nifi/nifi-assembly/pom.xml                      |   1 +
 .../org/apache/nifi/util/NiFiProperties.java    |   1 +
 .../src/main/resources/conf/nifi.properties     |   1 +
 .../PersistentProvenanceRepository.java         | 146 +++++++++++++++----
 .../provenance/RepositoryConfiguration.java     |  34 +++--
 5 files changed, 143 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/f260ec76/nifi/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-assembly/pom.xml b/nifi/nifi-assembly/pom.xml
index 60a6545..34d6c25 100644
--- a/nifi/nifi-assembly/pom.xml
+++ b/nifi/nifi-assembly/pom.xml
@@ -280,6 +280,7 @@ language governing permissions and limitations under the License. -->
         <nifi.provenance.repository.rollover.time>30 secs</nifi.provenance.repository.rollover.time>
         <nifi.provenance.repository.rollover.size>100 MB</nifi.provenance.repository.rollover.size>
         <nifi.provenance.repository.query.threads>2</nifi.provenance.repository.query.threads>
+        <nifi.provenance.repository.index.threads>1</nifi.provenance.repository.index.threads>
         <nifi.provenance.repository.compress.on.rollover>true</nifi.provenance.repository.compress.on.rollover>
         <nifi.provenance.repository.indexed.fields>EventType, FlowFileUUID, Filename, ProcessorID, Relationship</nifi.provenance.repository.indexed.fields> 
         <nifi.provenance.repository.indexed.attributes />

http://git-wip-us.apache.org/repos/asf/nifi/blob/f260ec76/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index e25f5d6..520e0ba 100644
--- a/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ b/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -99,6 +99,7 @@ public class NiFiProperties extends Properties {
     public static final String PROVENANCE_ROLLOVER_TIME = "nifi.provenance.repository.rollover.time";
     public static final String PROVENANCE_ROLLOVER_SIZE = "nifi.provenance.repository.rollover.size";
     public static final String PROVENANCE_QUERY_THREAD_POOL_SIZE = "nifi.provenance.repository.query.threads";
+    public static final String PROVENANCE_INDEX_THREAD_POOL_SIZE = "nifi.provenance.repository.index.threads";
     public static final String PROVENANCE_COMPRESS_ON_ROLLOVER = "nifi.provenance.repository.compress.on.rollover";
     public static final String PROVENANCE_INDEXED_FIELDS = "nifi.provenance.repository.indexed.fields";
     public static final String PROVENANCE_INDEXED_ATTRIBUTES = "nifi.provenance.repository.indexed.attributes";

http://git-wip-us.apache.org/repos/asf/nifi/blob/f260ec76/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index 4043076..63e5391 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -71,6 +71,7 @@ nifi.provenance.repository.max.storage.size=${nifi.provenance.repository.max.sto
 nifi.provenance.repository.rollover.time=${nifi.provenance.repository.rollover.time}
 nifi.provenance.repository.rollover.size=${nifi.provenance.repository.rollover.size}
 nifi.provenance.repository.query.threads=${nifi.provenance.repository.query.threads}
+nifi.provenance.repository.index.threads=${nifi.provenance.repository.index.threads}
 nifi.provenance.repository.compress.on.rollover=${nifi.provenance.repository.compress.on.rollover}
 nifi.provenance.repository.always.sync=${nifi.provenance.repository.always.sync}
 nifi.provenance.repository.journal.count=${nifi.provenance.repository.journal.count}

http://git-wip-us.apache.org/repos/asf/nifi/blob/f260ec76/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 81d883a..4408e3d 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -38,6 +38,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -45,6 +46,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
@@ -95,6 +97,7 @@ import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.RingBuffer;
 import org.apache.nifi.util.RingBuffer.ForEachEvaluator;
 import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.Tuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -278,6 +281,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         final String rolloverSize = properties.getProperty(NiFiProperties.PROVENANCE_ROLLOVER_SIZE, "100 MB");
         final String shardSize = properties.getProperty(NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE, "500 MB");
         final int queryThreads = properties.getIntegerProperty(NiFiProperties.PROVENANCE_QUERY_THREAD_POOL_SIZE, 2);
+        final int indexThreads = properties.getIntegerProperty(
+                NiFiProperties.PROVENANCE_INDEX_THREAD_POOL_SIZE, 1);
         final int journalCount = properties.getIntegerProperty(NiFiProperties.PROVENANCE_JOURNAL_COUNT, 16);
 
         final long storageMillis = FormatUtils.getTimeDuration(storageTime, TimeUnit.MILLISECONDS);
@@ -326,6 +331,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         config.setMaxRecordLife(storageMillis, TimeUnit.MILLISECONDS);
         config.setMaxStorageCapacity(maxStorageBytes);
         config.setQueryThreadPoolSize(queryThreads);
+        config.setIndexThreadPoolSize(indexThreads);
         config.setJournalCount(journalCount);
         config.setMaxAttributeChars(maxAttrChars);
 
@@ -795,7 +801,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
      *
      * @throws IOException if unable to purge old events due to an I/O problem
      */
-    void purgeOldEvents() throws IOException {
+    synchronized void purgeOldEvents() throws IOException {
         while (!recoveryFinished.get()) {
             try {
                 Thread.sleep(100L);
@@ -1009,6 +1015,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                         }
 
                         if (fileRolledOver == null) {
+                            logger.debug("Couldn't merge journals. Will try again in 10 seconds. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir);
                             return;
                         }
                         final File file = fileRolledOver;
@@ -1063,19 +1070,31 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
 
             if (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) {
                 logger.warn("The rate of the dataflow is exceeding the provenance recording rate. "
-                        + "Slowing down flow to accomodate. Currently, there are {} journal files ({} bytes) and "
+                        + "Slowing down flow to accommodate. Currently, there are {} journal files ({} bytes) and "
                         + "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold);
                 eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The rate of the dataflow is "
-                        + "exceeding the provenance recording rate. Slowing down flow to accomodate");
+                        + "exceeding the provenance recording rate. Slowing down flow to accommodate");
 
                 while (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) {
-                    try {
-                        Thread.sleep(1000L);
-                    } catch (final InterruptedException ie) {
+                    if (repoSize > sizeThreshold) {
+                        logger.debug("Provenance Repository has exceeded its size threshold; will trigger purging of oldest events");
+                        purgeOldEvents();
+
+                        journalFileCount = getJournalCount();
+                        repoSize = getSize(getLogFiles(), 0L);
+                        continue;
+                    } else {
+                        // if we are constrained by the number of journal files rather than the size of the repo,
+                        // then we will just sleep a bit because another thread is already actively merging the journals,
+                        // due to the runnable that we scheduled above
+                        try {
+                            Thread.sleep(100L);
+                        } catch (final InterruptedException ie) {
+                        }
                     }
 
                     logger.debug("Provenance Repository is still behind. Keeping flow slowed down "
-                            + "to accomodate. Currently, there are {} journal files ({} bytes) and "
+                            + "to accommodate. Currently, there are {} journal files ({} bytes) and "
                             + "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold);
 
                     journalFileCount = getJournalCount();
@@ -1169,6 +1188,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         }
 
         if (journalFiles.isEmpty()) {
+            logger.debug("Couldn't merge journals: Journal Files is empty; won't merge journals");
             return null;
         }
 
@@ -1328,45 +1348,110 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                 final IndexingAction indexingAction = new IndexingAction(this);
 
                 final File indexingDirectory = indexConfig.getWritableIndexDirectory(writerFile, earliestTimestamp);
+                long maxId = 0L;
+
+                final BlockingQueue<Tuple<StandardProvenanceEventRecord, Integer>> eventQueue = new LinkedBlockingQueue<>(100);
+                final AtomicBoolean finishedAdding = new AtomicBoolean(false);
+                final List<Future<?>> futures = new ArrayList<>();
+
                 final IndexWriter indexWriter = indexManager.borrowIndexWriter(indexingDirectory);
                 try {
-                    long maxId = 0L;
+                    final ExecutorService exec = Executors.newFixedThreadPool(configuration.getIndexThreadPoolSize(), new ThreadFactory() {
+                        @Override
+                        public Thread newThread(final Runnable r) {
+                            final Thread t = Executors.defaultThreadFactory().newThread(r);
+                            t.setName("Index Provenance Events");
+                            return t;
+                        }
+                    });
 
-                    while (!recordToReaderMap.isEmpty()) {
-                        final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
-                        final StandardProvenanceEventRecord record = entry.getKey();
-                        final RecordReader reader = entry.getValue();
+                    try {
+                        for (int i = 0; i < 6; i++) {
+                            final Callable<Object> callable = new Callable<Object>() {
+                                @Override
+                                public Object call() throws IOException {
+                                    while (!eventQueue.isEmpty() || !finishedAdding.get()) {
+                                        final Tuple<StandardProvenanceEventRecord, Integer> tuple;
+                                        try {
+                                            tuple = eventQueue.poll(10, TimeUnit.MILLISECONDS);
+                                        } catch (final InterruptedException ie) {
+                                            continue;
+                                        }
+
+                                        if (tuple == null) {
+                                            continue;
+                                        }
+
+                                        indexingAction.index(tuple.getKey(), indexWriter, tuple.getValue());
+                                    }
 
-                        writer.writeRecord(record, record.getEventId());
-                        final int blockIndex = writer.getTocWriter().getCurrentBlockIndex();
+                                    return null;
+                                }
+                            };
 
-                        indexingAction.index(record, indexWriter, blockIndex);
-                        maxId = record.getEventId();
+                            final Future<?> future = exec.submit(callable);
+                            futures.add(future);
+                        }
 
-                        latestRecords.add(truncateAttributes(record));
-                        records++;
+                        while (!recordToReaderMap.isEmpty()) {
+                            final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
+                            final StandardProvenanceEventRecord record = entry.getKey();
+                            final RecordReader reader = entry.getValue();
 
-                        // Remove this entry from the map
-                        recordToReaderMap.remove(record);
+                            writer.writeRecord(record, record.getEventId());
+                            final int blockIndex = writer.getTocWriter().getCurrentBlockIndex();
 
-                        // Get the next entry from this reader and add it to the map
-                        StandardProvenanceEventRecord nextRecord = null;
+                            boolean accepted = false;
+                            while (!accepted) {
+                                try {
+                                    accepted = eventQueue.offer(new Tuple<>(record, blockIndex), 10, TimeUnit.MILLISECONDS);
+                                } catch (final InterruptedException ie) {
+                                }
+                            }
+                            maxId = record.getEventId();
 
-                        try {
-                            nextRecord = reader.nextRecord();
-                        } catch (final EOFException eof) {
-                        }
+                            latestRecords.add(truncateAttributes(record));
+                            records++;
+
+                            // Remove this entry from the map
+                            recordToReaderMap.remove(record);
 
-                        if (nextRecord != null) {
-                            recordToReaderMap.put(nextRecord, reader);
+                            // Get the next entry from this reader and add it to the map
+                            StandardProvenanceEventRecord nextRecord = null;
+
+                            try {
+                                nextRecord = reader.nextRecord();
+                            } catch (final EOFException eof) {
+                            }
+
+                            if (nextRecord != null) {
+                                recordToReaderMap.put(nextRecord, reader);
+                            }
                         }
+                    } finally {
+                        finishedAdding.set(true);
+                        exec.shutdown();
                     }
 
-                    indexWriter.commit();
-                    indexConfig.setMaxIdIndexed(maxId);
+                    for (final Future<?> future : futures) {
+                        try {
+                            future.get();
+                        } catch (final ExecutionException ee) {
+                            final Throwable t = ee.getCause();
+                            if (t instanceof RuntimeException) {
+                                throw (RuntimeException) t;
+                            }
+
+                            throw new RuntimeException(t);
+                        } catch (final InterruptedException e) {
+                            throw new RuntimeException("Thread interrupted");
+                        }
+                    }
                 } finally {
                     indexManager.returnIndexWriter(indexingDirectory, indexWriter);
                 }
+
+                indexConfig.setMaxIdIndexed(maxId);
             }
 
             // record should now be available in the repository. We can copy the values from latestRecords to ringBuffer.
@@ -1402,6 +1487,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
 
         if (records == 0) {
             writerFile.delete();
+            logger.debug("Couldn't merge journals: No Records to merge");
             return null;
         } else {
             final long nanos = System.nanoTime() - startNanos;

http://git-wip-us.apache.org/repos/asf/nifi/blob/f260ec76/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
index 381d778..e63133a 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
@@ -16,14 +16,14 @@
  */
 package org.apache.nifi.provenance;
 
-import org.apache.nifi.provenance.search.SearchableField;
-
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.nifi.provenance.search.SearchableField;
+
 public class RepositoryConfiguration {
 
     private final List<File> storageDirectories = new ArrayList<>();
@@ -40,7 +40,8 @@ public class RepositoryConfiguration {
     private List<SearchableField> searchableAttributes = new ArrayList<>();
     private boolean compress = true;
     private boolean alwaysSync = false;
-    private int queryThreadPoolSize = 1;
+    private int queryThreadPoolSize = 2;
+    private int indexThreadPoolSize = 1;
     private boolean allowRollover = true;
 
     public void setAllowRollover(final boolean allow) {
@@ -204,6 +205,20 @@ public class RepositoryConfiguration {
     }
 
     /**
+     * @return the number of threads to use to index provenance events
+     */
+    public int getIndexThreadPoolSize() {
+        return indexThreadPoolSize;
+    }
+
+    public void setIndexThreadPoolSize(final int indexThreadPoolSize) {
+        if (indexThreadPoolSize < 1) {
+            throw new IllegalArgumentException();
+        }
+        this.indexThreadPoolSize = indexThreadPoolSize;
+    }
+
+    /**
      * <p>
      * Specifies the desired size of each Provenance Event index shard, in
      * bytes. We shard the index for a few reasons:
@@ -213,22 +228,21 @@ public class RepositoryConfiguration {
      * <li>
      * A very large index requires a significant amount of Java heap space to
      * search. As the size of the shard increases, the required Java heap space
-     * also increases.
-     * </li>
+     * also increases.</li>
      * <li>
      * By having multiple shards, we have the ability to use multiple concurrent
      * threads to search the individual shards, resulting in far less latency
-     * when performing a search across millions or billions of records.
-     * </li>
+     * when performing a search across millions or billions of records.</li>
      * <li>
      * We keep track of which time ranges each index shard spans. As a result,
      * we are able to determine which shards need to be searched if a search
      * provides a date range. This can greatly increase the speed of a search
-     * and reduce resource utilization.
-     * </li>
+     * and reduce resource utilization.</li>
      * </ol>
      *
-     * @param bytes the number of bytes to write to an index before beginning a new shard
+     * @param bytes
+     *            the number of bytes to write to an index before beginning a
+     *            new shard
      */
     public void setDesiredIndexSize(final long bytes) {
         this.desiredIndexBytes = bytes;