You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/04/22 15:57:30 UTC

[1/7] incubator-nifi git commit: NIFI-527: Refactored the serialization format of the persistent prov repo to use compression blocks and index them

Repository: incubator-nifi
Updated Branches:
  refs/heads/improve-prov-performance [created] 64132affb


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a1027aea/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
new file mode 100644
index 0000000..7459fe8
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.flowfile.FlowFile;
+
+public class TestUtil {
+	public static FlowFile createFlowFile(final long id, final long fileSize, final Map<String, String> attributes) {
+        final Map<String, String> attrCopy = new HashMap<>(attributes);
+
+        return new FlowFile() {
+            @Override
+            public long getId() {
+                return id;
+            }
+
+            @Override
+            public long getEntryDate() {
+                return System.currentTimeMillis();
+            }
+
+            @Override
+            public Set<String> getLineageIdentifiers() {
+                return new HashSet<String>();
+            }
+
+            @Override
+            public long getLineageStartDate() {
+                return System.currentTimeMillis();
+            }
+
+            @Override
+            public Long getLastQueueDate() {
+                return System.currentTimeMillis();
+            }
+
+            @Override
+            public boolean isPenalized() {
+                return false;
+            }
+
+            @Override
+            public String getAttribute(final String s) {
+                return attrCopy.get(s);
+            }
+
+            @Override
+            public long getSize() {
+                return fileSize;
+            }
+
+            @Override
+            public Map<String, String> getAttributes() {
+                return attrCopy;
+            }
+
+            @Override
+            public int compareTo(final FlowFile o) {
+                return 0;
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a1027aea/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java
new file mode 100644
index 0000000..30326e7
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.UUID;
+
+import org.junit.Test;
+
+public class TestStandardTocReader {
+
+    @Test
+    public void testDetectsCompression() throws IOException {
+        final File file = new File("target/" + UUID.randomUUID().toString());
+        try (final OutputStream out = new FileOutputStream(file)) {
+            out.write(0);
+            out.write(0);
+        }
+        
+        try {
+            try(final StandardTocReader reader = new StandardTocReader(file)) {
+                assertFalse(reader.isCompressed());
+            }
+        } finally {
+            file.delete();
+        }
+        
+        
+        try (final OutputStream out = new FileOutputStream(file)) {
+            out.write(0);
+            out.write(1);
+        }
+        
+        try {
+            try(final StandardTocReader reader = new StandardTocReader(file)) {
+                assertTrue(reader.isCompressed());
+            }
+        } finally {
+            file.delete();
+        }
+    }
+    
+    
+    @Test
+    public void testGetBlockIndex() throws IOException {
+        final File file = new File("target/" + UUID.randomUUID().toString());
+        try (final OutputStream out = new FileOutputStream(file);
+             final DataOutputStream dos = new DataOutputStream(out)) {
+            out.write(0);
+            out.write(0);
+            
+            for (int i=0; i < 1024; i++) {
+                dos.writeLong(i * 1024L);
+            }
+        }
+        
+        try {
+            try(final StandardTocReader reader = new StandardTocReader(file)) {
+                assertFalse(reader.isCompressed());
+                
+                for (int i=0; i < 1024; i++) {
+                    assertEquals(i * 1024, reader.getBlockOffset(i));
+                }
+            }
+        } finally {
+            file.delete();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a1027aea/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java
new file mode 100644
index 0000000..267d053
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.util.UUID;
+
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestStandardTocWriter {
+    @Test
+    public void testOverwriteEmptyFile() throws IOException {
+        final File tocFile = new File("target/" + UUID.randomUUID().toString() + ".toc");
+        try {
+            assertTrue( tocFile.createNewFile() );
+            
+            try (final StandardTocWriter writer = new StandardTocWriter(tocFile, false, false)) {
+            }
+        } finally {
+            FileUtils.deleteFile(tocFile, false);
+        }
+    }
+    
+    @Test
+    public void testDoNotOverwriteNonEmptyFile() throws IOException {
+        final File tocFile = new File("target/" + UUID.randomUUID().toString() + ".toc");
+        try {
+            assertTrue( tocFile.createNewFile() );
+            
+            try (final StandardTocWriter writer = new StandardTocWriter(tocFile, false, false)) {
+                writer.addBlockOffset(0L);
+                writer.addBlockOffset(34L);
+            }
+            
+            try (final StandardTocWriter writer = new StandardTocWriter(tocFile, false, false)) {
+                Assert.fail("StandardTocWriter attempted to overwrite existing file");
+            } catch (final FileAlreadyExistsException faee) {
+                // expected
+            }
+        } finally {
+            FileUtils.deleteFile(tocFile, false);
+        }
+    }
+}


[5/7] incubator-nifi git commit: NIFI-527: Cleaned up log messages

Posted by ma...@apache.org.
NIFI-527: Cleaned up log messages


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/2f7b2ef3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/2f7b2ef3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/2f7b2ef3

Branch: refs/heads/improve-prov-performance
Commit: 2f7b2ef30091334d40655485d147aa1a571fd6b9
Parents: 9a9973a
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Apr 21 10:54:33 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Apr 21 10:54:33 2015 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/provenance/lucene/IndexManager.java   | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2f7b2ef3/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
index 0d93f3b..3943504 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
@@ -141,7 +141,8 @@ public class IndexManager implements Closeable {
 			
 			try {
 				if ( count == null ) {
-					logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. This could potentially lead to a resource leak", writer, indexingDirectory);
+					logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. "
+							+ "This could potentially lead to a resource leak", writer, indexingDirectory);
 					writer.close();
 				} else if ( count.getCount() <= 1 ) {
 					// we are finished with this writer.
@@ -185,10 +186,9 @@ public class IndexManager implements Closeable {
 						if ( searcher.isCache() ) {
 							final int refCount = searcher.getSearcher().getIndexReader().getRefCount();
 							if ( refCount <= 0 ) {
-								// if refCount == 0, then the reader has been closed, so we need to discard the
-								// searcher.
-								logger.debug("Reference count for cached Index Searcher {} is currently {}; "
-									+ "removing cached searcher", searcher.getSearcher().getIndexReader(), refCount);
+								// if refCount == 0, then the reader has been closed, so we need to discard the searcher
+								logger.debug("Reference count for cached Index Searcher for {} is currently {}; "
+									+ "removing cached searcher", absoluteFile, refCount);
 								expired.add(searcher);
 								continue;
 							}


[7/7] incubator-nifi git commit: NIFI-524: - Adding a tab for process groups statistics in the Summary table.

Posted by ma...@apache.org.
NIFI-524: - Adding a tab for process groups statistics in the Summary table.

Signed-off-by: Mark Payne <ma...@hotmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/64132aff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/64132aff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/64132aff

Branch: refs/heads/improve-prov-performance
Commit: 64132affb6141071a155d139026775ed89f993ff
Parents: 4dafd65
Author: Matt Gilman <ma...@gmail.com>
Authored: Mon Apr 20 19:19:49 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Apr 21 11:20:19 2015 -0400

----------------------------------------------------------------------
 .../controller/status/ProcessGroupStatus.java   |  40 +-
 .../status/ClusterProcessGroupStatusDTO.java    |  89 ++++
 .../dto/status/NodeProcessGroupStatusDTO.java   |  57 +++
 .../api/dto/status/ProcessGroupStatusDTO.java   |  72 +++
 .../entity/ClusterProcessGroupStatusEntity.java |  45 ++
 .../apache/nifi/controller/FlowController.java  |  10 +
 .../org/apache/nifi/web/NiFiServiceFacade.java  |   9 +
 .../nifi/web/StandardNiFiServiceFacade.java     |  75 +++-
 .../apache/nifi/web/api/ClusterResource.java    |  38 ++
 .../org/apache/nifi/web/api/dto/DtoFactory.java |  10 +-
 .../src/main/webapp/WEB-INF/pages/summary.jsp   |   1 +
 .../cluster-process-group-summary-dialog.jsp    |  36 ++
 .../partials/summary/summary-content.jsp        |   3 +
 .../nifi-web-ui/src/main/webapp/css/summary.css |  83 ++++
 .../src/main/webapp/images/iconProcessGroup.png | Bin 0 -> 1314 bytes
 .../webapp/js/nf/summary/nf-summary-table.js    | 446 +++++++++++++++++--
 16 files changed, 967 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64132aff/nifi/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java
index dba3a19..c57169f 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java
@@ -43,6 +43,8 @@ public class ProcessGroupStatus implements Cloneable {
     private long bytesReceived;
     private int flowFilesSent;
     private long bytesSent;
+    private int flowFilesTransferred;
+    private long bytesTransferred;
 
     private Collection<ConnectionStatus> connectionStatus = new ArrayList<>();
     private Collection<ProcessorStatus> processorStatus = new ArrayList<>();
@@ -227,6 +229,22 @@ public class ProcessGroupStatus implements Cloneable {
         this.bytesSent = bytesSent;
     }
 
+    public int getFlowFilesTransferred() {
+        return flowFilesTransferred;
+    }
+
+    public void setFlowFilesTransferred(int flowFilesTransferred) {
+        this.flowFilesTransferred = flowFilesTransferred;
+    }
+
+    public long getBytesTransferred() {
+        return bytesTransferred;
+    }
+
+    public void setBytesTransferred(long bytesTransferred) {
+        this.bytesTransferred = bytesTransferred;
+    }
+
     @Override
     public ProcessGroupStatus clone() {
 
@@ -248,6 +266,8 @@ public class ProcessGroupStatus implements Cloneable {
         clonedObj.bytesReceived = bytesReceived;
         clonedObj.flowFilesSent = flowFilesSent;
         clonedObj.bytesSent = bytesSent;
+        clonedObj.flowFilesTransferred = flowFilesTransferred;
+        clonedObj.bytesTransferred = bytesTransferred;
 
         if (connectionStatus != null) {
             final Collection<ConnectionStatus> statusList = new ArrayList<>();
@@ -317,6 +337,18 @@ public class ProcessGroupStatus implements Cloneable {
         builder.append(creationTimestamp);
         builder.append(", activeThreadCount=");
         builder.append(activeThreadCount);
+        builder.append(", flowFilesTransferred=");
+        builder.append(flowFilesTransferred);
+        builder.append(", bytesTransferred=");
+        builder.append(bytesTransferred);
+        builder.append(", flowFilesReceived=");
+        builder.append(flowFilesReceived);
+        builder.append(", bytesReceived=");
+        builder.append(bytesReceived);
+        builder.append(", flowFilesSent=");
+        builder.append(flowFilesSent);
+        builder.append(", bytesSent=");
+        builder.append(bytesSent);
         builder.append(",\n\tconnectionStatus=");
 
         for (final ConnectionStatus status : connectionStatus) {
@@ -374,7 +406,13 @@ public class ProcessGroupStatus implements Cloneable {
         target.setBytesRead(target.getBytesRead() + toMerge.getBytesRead());
         target.setBytesWritten(target.getBytesWritten() + toMerge.getBytesWritten());
         target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount());
-
+        target.setFlowFilesTransferred(target.getFlowFilesTransferred() + toMerge.getFlowFilesTransferred());
+        target.setBytesTransferred(target.getBytesTransferred() + toMerge.getBytesTransferred());
+        target.setFlowFilesReceived(target.getFlowFilesReceived() + toMerge.getFlowFilesReceived());
+        target.setBytesReceived(target.getBytesReceived() + toMerge.getBytesReceived());
+        target.setFlowFilesSent(target.getFlowFilesSent() + toMerge.getFlowFilesSent());
+        target.setBytesSent(target.getBytesSent() + toMerge.getBytesSent());
+        
         // connection status
         // sort by id
         final Map<String, ConnectionStatus> mergedConnectionMap = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64132aff/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ClusterProcessGroupStatusDTO.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ClusterProcessGroupStatusDTO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ClusterProcessGroupStatusDTO.java
new file mode 100644
index 0000000..d1929e6
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ClusterProcessGroupStatusDTO.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.dto.status;
+
+import java.util.Collection;
+import java.util.Date;
+import javax.xml.bind.annotation.XmlType;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import org.apache.nifi.web.api.dto.util.TimeAdapter;
+
+/**
+ * DTO for serializing the a process group's status across the cluster.
+ */
+@XmlType(name = "clusterProcessGroupStatus")
+public class ClusterProcessGroupStatusDTO {
+
+    private Collection<NodeProcessGroupStatusDTO> nodeProcessGroupStatus;
+    private Date statsLastRefreshed;
+    private String processGroupId;
+    private String processGroupName;
+
+    /**
+     * The time the status were last refreshed.
+     *
+     * @return
+     */
+    @XmlJavaTypeAdapter(TimeAdapter.class)
+    public Date getStatsLastRefreshed() {
+        return statsLastRefreshed;
+    }
+
+    public void setStatsLastRefreshed(Date statsLastRefreshed) {
+        this.statsLastRefreshed = statsLastRefreshed;
+    }
+
+    /**
+     * The process group id.
+     *
+     * @return
+     */
+    public String getProcessGroupId() {
+        return processGroupId;
+    }
+
+    public void setProcessGroupId(String processGroupId) {
+        this.processGroupId = processGroupId;
+    }
+
+    /**
+     * The process group name.
+     *
+     * @return
+     */
+    public String getProcessGroupName() {
+        return processGroupName;
+    }
+
+    public void setProcessGroupName(String processGroupName) {
+        this.processGroupName = processGroupName;
+    }
+
+    /**
+     * Collection of node process group status DTO.
+     *
+     * @return The collection of node process group status DTO
+     */
+    public Collection<NodeProcessGroupStatusDTO> getNodeProcessGroupStatus() {
+        return nodeProcessGroupStatus;
+    }
+
+    public void setNodeProcessGroupStatus(Collection<NodeProcessGroupStatusDTO> nodeProcessGroupStatus) {
+        this.nodeProcessGroupStatus = nodeProcessGroupStatus;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64132aff/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/NodeProcessGroupStatusDTO.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/NodeProcessGroupStatusDTO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/NodeProcessGroupStatusDTO.java
new file mode 100644
index 0000000..ded0621
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/NodeProcessGroupStatusDTO.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.dto.status;
+
+import javax.xml.bind.annotation.XmlType;
+import org.apache.nifi.web.api.dto.NodeDTO;
+
+/**
+ * DTO for serializing the process group status for a particular node.
+ */
+@XmlType(name = "nodeProcessGroupStatus")
+public class NodeProcessGroupStatusDTO {
+
+    private NodeDTO node;
+    private ProcessGroupStatusDTO processGroupStatus;
+
+    /**
+     * The node.
+     *
+     * @return
+     */
+    public NodeDTO getNode() {
+        return node;
+    }
+
+    public void setNode(NodeDTO node) {
+        this.node = node;
+    }
+
+    /**
+     * The process group's status.
+     *
+     * @return
+     */
+    public ProcessGroupStatusDTO getProcessGroupStatus() {
+        return processGroupStatus;
+    }
+
+    public void setProcessGroupStatus(ProcessGroupStatusDTO processGroupStatus) {
+        this.processGroupStatus = processGroupStatus;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64132aff/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusDTO.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusDTO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusDTO.java
index 4fa2b64..bd8b82e 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusDTO.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ProcessGroupStatusDTO.java
@@ -38,10 +38,15 @@ public class ProcessGroupStatusDTO extends StatusDTO {
     private Collection<PortStatusDTO> outputPortStatus;
 
     private String input;
+    private String queuedCount;
+    private String queuedSize;
     private String queued;
     private String read;
     private String written;
     private String output;
+    private String transferred;
+    private String received;
+    private String sent;
     private Integer activeThreadCount;
     private Date statsLastRefreshed;
 
@@ -176,6 +181,73 @@ public class ProcessGroupStatusDTO extends StatusDTO {
     }
 
     /**
+     * The transferred stats for this process group. This represents the count/size
+     * of flowfiles transferred to/from queues.
+     * 
+     * @return 
+     */
+    public String getTransferred() {
+        return transferred;
+    }
+
+    public void setTransferred(String transferred) {
+        this.transferred = transferred;
+    }
+
+    /**
+     * The received stats for this process group. This represents the count/size
+     * of flowfiles received.
+     * 
+     * @return 
+     */
+    public String getReceived() {
+        return received;
+    }
+
+    public void setReceived(String received) {
+        this.received = received;
+    }
+
+    /**
+     * The sent stats for this process group. This represents the count/size of
+     * flowfiles sent.
+     * 
+     * @return 
+     */
+    public String getSent() {
+        return sent;
+    }
+
+    public void setSent(String sent) {
+        this.sent = sent;
+    }
+
+    /**
+     * The queued count for this process group.
+     * 
+     * @return 
+     */
+    public String getQueuedCount() {
+        return queuedCount;
+    }
+
+    public void setQueuedCount(String queuedCount) {
+        this.queuedCount = queuedCount;
+    }
+
+    /**
+     * The queued size for this process group.
+     * @return 
+     */
+    public String getQueuedSize() {
+        return queuedSize;
+    }
+
+    public void setQueuedSize(String queuedSize) {
+        this.queuedSize = queuedSize;
+    }
+
+    /**
      * The queued stats for this process group.
      *
      * @return The queued stats

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64132aff/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ClusterProcessGroupStatusEntity.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ClusterProcessGroupStatusEntity.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ClusterProcessGroupStatusEntity.java
new file mode 100644
index 0000000..cddb21a
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ClusterProcessGroupStatusEntity.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO;
+
+/**
+ * A serialized representation of this class can be placed in the entity body of
+ * a request or response to or from the API. This particular entity holds a
+ * reference to a ClusterProcessGroupStatusDTO.
+ */
+@XmlRootElement(name = "clusterProcessGroupStatusEntity")
+public class ClusterProcessGroupStatusEntity extends Entity {
+
+    private ClusterProcessGroupStatusDTO clusterProcessGroupStatus;
+
+    /**
+     * The ClusterProcessGroupStatusDTO that is being serialized.
+     *
+     * @return The ClusterProcessGroupStatusDTO object
+     */
+    public ClusterProcessGroupStatusDTO getClusterProcessGroupStatus() {
+        return clusterProcessGroupStatus;
+    }
+
+    public void setClusterProcessGroupStatus(ClusterProcessGroupStatusDTO clusterProcessGroupStatus) {
+        this.clusterProcessGroupStatus = clusterProcessGroupStatus;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64132aff/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index ec25ab1..6598204 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2066,6 +2066,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         long bytesReceived = 0L;
         int flowFilesSent = 0;
         long bytesSent = 0L;
+        int flowFilesTransferred = 0;
+        long bytesTransferred = 0;
 
         // set status for processors
         final Collection<ProcessorStatus> processorStatusCollection = new ArrayList<>();
@@ -2099,6 +2101,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             bytesReceived += childGroupStatus.getBytesReceived();
             flowFilesSent += childGroupStatus.getFlowFilesSent();
             bytesSent += childGroupStatus.getBytesSent();
+            
+            flowFilesTransferred += childGroupStatus.getFlowFilesTransferred();
+            bytesTransferred += childGroupStatus.getBytesTransferred();
         }
 
         // set status for remote child groups
@@ -2136,6 +2141,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 connStatus.setInputCount(connectionStatusReport.getFlowFilesIn());
                 connStatus.setOutputBytes(connectionStatusReport.getContentSizeOut());
                 connStatus.setOutputCount(connectionStatusReport.getFlowFilesOut());
+                
+                flowFilesTransferred += (connectionStatusReport.getFlowFilesIn() + connectionStatusReport.getFlowFilesOut());
+                bytesTransferred += (connectionStatusReport.getContentSizeIn() + connectionStatusReport.getContentSizeOut());
             }
 
             if (StringUtils.isNotBlank(conn.getName())) {
@@ -2306,6 +2314,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         status.setBytesReceived(bytesReceived);
         status.setFlowFilesSent(flowFilesSent);
         status.setBytesSent(bytesSent);
+        status.setFlowFilesTransferred(flowFilesTransferred);
+        status.setBytesTransferred(bytesTransferred);
 
         return status;
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64132aff/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 8d9dade..9a35503 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -63,6 +63,7 @@ import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO;
 import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
 import org.apache.nifi.web.api.dto.status.ClusterConnectionStatusDTO;
 import org.apache.nifi.web.api.dto.status.ClusterPortStatusDTO;
+import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO;
 import org.apache.nifi.web.api.dto.status.ClusterProcessorStatusDTO;
 import org.apache.nifi.web.api.dto.status.ClusterRemoteProcessGroupStatusDTO;
 import org.apache.nifi.web.api.dto.status.ClusterStatusDTO;
@@ -1403,6 +1404,14 @@ public interface NiFiServiceFacade {
     ClusterStatusHistoryDTO getClusterProcessGroupStatusHistory(String processGroupId);
 
     /**
+     * Returns a process group's status for each node connected to the cluster.
+     *
+     * @param processorId a process group identifier
+     * @return The cluster process group status transfer object.
+     */
+    ClusterProcessGroupStatusDTO getClusterProcessGroupStatus(String processorId);
+    
+    /**
      * Returns the remote process group status history for each node connected
      * to the cluster.
      *

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64132aff/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 88637b4..082da1c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -163,6 +163,8 @@ import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
 import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
 import org.apache.nifi.web.api.dto.ReportingTaskDTO;
+import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO;
+import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusDTO;
 import org.apache.nifi.web.dao.ControllerServiceDAO;
 import org.apache.nifi.web.dao.ReportingTaskDAO;
 import org.slf4j.Logger;
@@ -2449,6 +2451,77 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
 
         return clusterConnectionStatusDto;
     }
+    
+    private ProcessGroupStatus findNodeProcessGroupStatus(final ProcessGroupStatus groupStatus, final String processGroupId) {
+        ProcessGroupStatus processGroupStatus = null;
+
+        if (processGroupId.equals(groupStatus.getId())) {
+            processGroupStatus = groupStatus;
+        }
+        
+        if (processGroupStatus == null) {
+            for (final ProcessGroupStatus status : groupStatus.getProcessGroupStatus()) {
+                processGroupStatus = findNodeProcessGroupStatus(status, processGroupId);
+
+                if (processGroupStatus != null) {
+                    break;
+                }
+            }
+        }
+
+        return processGroupStatus;
+    }
+    
+    @Override
+    public ClusterProcessGroupStatusDTO getClusterProcessGroupStatus(String processGroupId) {
+
+        final ClusterProcessGroupStatusDTO clusterProcessGroupStatusDto = new ClusterProcessGroupStatusDTO();
+        clusterProcessGroupStatusDto.setNodeProcessGroupStatus(new ArrayList<NodeProcessGroupStatusDTO>());
+
+        // set the current time
+        clusterProcessGroupStatusDto.setStatsLastRefreshed(new Date());
+
+        final Set<Node> nodes = clusterManager.getNodes(Node.Status.CONNECTED);
+        boolean firstNode = true;
+        for (final Node node : nodes) {
+
+            final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
+            if (nodeHeartbeatPayload == null) {
+                continue;
+            }
+
+            final ProcessGroupStatus nodeStats = nodeHeartbeatPayload.getProcessGroupStatus();
+            if (nodeStats == null || nodeStats.getProcessorStatus() == null) {
+                continue;
+            }
+
+            // attempt to find the process group stats for this node
+            final ProcessGroupStatus processGroupStatus = findNodeProcessGroupStatus(nodeStats, processGroupId);
+
+            // sanity check that we have status for this process group
+            if (processGroupStatus == null) {
+                throw new ResourceNotFoundException(String.format("Unable to find status for process group id '%s'.", processGroupId));
+            }
+
+            if (firstNode) {
+                clusterProcessGroupStatusDto.setProcessGroupId(processGroupId);
+                clusterProcessGroupStatusDto.setProcessGroupName(processGroupStatus.getName());
+                firstNode = false;
+            }
+
+            // create node process group status dto
+            final NodeProcessGroupStatusDTO nodeProcessGroupStatusDTO = new NodeProcessGroupStatusDTO();
+            clusterProcessGroupStatusDto.getNodeProcessGroupStatus().add(nodeProcessGroupStatusDTO);
+
+            // populate node process group status dto
+            final String nodeId = node.getNodeId().getId();
+            nodeProcessGroupStatusDTO.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId)));
+            nodeProcessGroupStatusDTO.setProcessGroupStatus(dtoFactory.createProcessGroupStatusDto(clusterManager.getBulletinRepository(), processGroupStatus));
+
+        }
+
+        return clusterProcessGroupStatusDto;
+    }
 
     private PortStatus findNodeInputPortStatus(final ProcessGroupStatus groupStatus, final String inputPortId) {
         PortStatus portStatus = null;
@@ -2670,7 +2743,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     public ClusterStatusHistoryDTO getClusterProcessorStatusHistory(String processorId) {
         return clusterManager.getProcessorStatusHistory(processorId);
     }
-
+    
     @Override
     public ClusterStatusHistoryDTO getClusterConnectionStatusHistory(String connectionId) {
         return clusterManager.getConnectionStatusHistory(connectionId);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64132aff/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
index 3a74782..f803471 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
@@ -69,6 +69,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.springframework.security.access.prepost.PreAuthorize;
 
 import com.sun.jersey.api.core.ResourceContext;
+import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO;
+import org.apache.nifi.web.api.entity.ClusterProcessGroupStatusEntity;
 import org.codehaus.enunciate.jaxrs.TypeHint;
 
 /**
@@ -535,6 +537,42 @@ public class ClusterResource extends ApplicationResource {
     }
 
     /**
+     * Gets the process group status for every node.
+     *
+     * @param clientId Optional client id. If the client id is not specified, a
+     * new one will be generated. This value (whether specified or generated) is
+     * included in the response.
+     * @param id The id of the process group
+     * @return A clusterProcessGroupStatusEntity
+     */
+    @GET
+    @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+    @Path("/process-groups/{id}/status")
+    @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+    @TypeHint(ClusterConnectionStatusEntity.class)
+    public Response getProcessGroupStatus(@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, @PathParam("id") String id) {
+
+        if (properties.isClusterManager()) {
+
+            final ClusterProcessGroupStatusDTO dto = serviceFacade.getClusterProcessGroupStatus(id);
+
+            // create the revision
+            RevisionDTO revision = new RevisionDTO();
+            revision.setClientId(clientId.getClientId());
+
+            // create entity
+            final ClusterProcessGroupStatusEntity entity = new ClusterProcessGroupStatusEntity();
+            entity.setClusterProcessGroupStatus(dto);
+            entity.setRevision(revision);
+
+            // generate the response
+            return generateOkResponse(entity).build();
+        }
+
+        throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
+    }
+    
+    /**
      * Gets the process group status history for every node.
      *
      * @param clientId Optional client id. If the client id is not specified, a

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64132aff/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 4e83ac5..9ee62db 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -506,13 +506,21 @@ public final class DtoFactory {
         processGroupStatusDto.setId(processGroupStatus.getId());
         processGroupStatusDto.setName(processGroupStatus.getName());
         processGroupStatusDto.setStatsLastRefreshed(new Date(processGroupStatus.getCreationTimestamp()));
-        processGroupStatusDto.setQueued(formatCount(processGroupStatus.getQueuedCount()) + " / " + formatDataSize(processGroupStatus.getQueuedContentSize()));
         processGroupStatusDto.setRead(formatDataSize(processGroupStatus.getBytesRead()));
         processGroupStatusDto.setWritten(formatDataSize(processGroupStatus.getBytesWritten()));
         processGroupStatusDto.setInput(formatCount(processGroupStatus.getInputCount()) + " / " + formatDataSize(processGroupStatus.getInputContentSize()));
         processGroupStatusDto.setOutput(formatCount(processGroupStatus.getOutputCount()) + " / " + formatDataSize(processGroupStatus.getOutputContentSize()));
+        processGroupStatusDto.setTransferred(formatCount(processGroupStatus.getFlowFilesTransferred()) + " / " + formatDataSize(processGroupStatus.getBytesTransferred()));
+        processGroupStatusDto.setSent(formatCount(processGroupStatus.getFlowFilesSent()) + " / " + formatDataSize(processGroupStatus.getBytesSent()));
+        processGroupStatusDto.setReceived(formatCount(processGroupStatus.getFlowFilesReceived()) + " / " + formatDataSize(processGroupStatus.getBytesReceived()));
         processGroupStatusDto.setActiveThreadCount(processGroupStatus.getActiveThreadCount());
 
+        final String queuedCount = FormatUtils.formatCount(processGroupStatus.getQueuedCount());
+        final String queuedSize = FormatUtils.formatDataSize(processGroupStatus.getQueuedContentSize());
+        processGroupStatusDto.setQueuedCount(queuedCount);
+        processGroupStatusDto.setQueuedSize(queuedSize);
+        processGroupStatusDto.setQueued(queuedCount + " / " + queuedSize);
+        
         final Map<String, StatusDTO> componentStatusDtoMap = new HashMap<>();
 
         // processor status

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64132aff/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/pages/summary.jsp
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/pages/summary.jsp b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/pages/summary.jsp
index 032509b..e6f3305 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/pages/summary.jsp
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/pages/summary.jsp
@@ -74,6 +74,7 @@
         <jsp:include page="/WEB-INF/partials/summary/cluster-output-port-summary-dialog.jsp"/>
         <jsp:include page="/WEB-INF/partials/summary/cluster-remote-process-group-summary-dialog.jsp"/>
         <jsp:include page="/WEB-INF/partials/summary/cluster-connection-summary-dialog.jsp"/>
+        <jsp:include page="/WEB-INF/partials/summary/cluster-process-group-summary-dialog.jsp"/>
         <jsp:include page="/WEB-INF/partials/summary/system-diagnostics-dialog.jsp"/>
         <jsp:include page="/WEB-INF/partials/summary/view-single-node-dialog.jsp"/>
         <div id="faded-background"></div>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64132aff/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/summary/cluster-process-group-summary-dialog.jsp
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/summary/cluster-process-group-summary-dialog.jsp b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/summary/cluster-process-group-summary-dialog.jsp
new file mode 100644
index 0000000..94526d0
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/summary/cluster-process-group-summary-dialog.jsp
@@ -0,0 +1,36 @@
+<%--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+--%>
+<%@ page contentType="text/html" pageEncoding="UTF-8" session="false" %>
+<div id="cluster-process-group-summary-dialog">
+    <div class="dialog-content">
+        <div id="cluster-process-group-summary-header">
+            <div id="cluster-process-group-refresh-button" class="summary-refresh pointer" title="Refresh"></div>
+            <div id="cluster-process-group-summary-last-refreshed-container">
+                Last updated:&nbsp;<span id="cluster-process-group-summary-last-refreshed"></span>
+            </div>
+            <div id="cluster-process-group-summary-loading-container" class="loading-container"></div>
+            <div id="cluster-process-group-details-container">
+                <div id="cluster-process-group-icon"></div>
+                <div id="cluster-process-group-details">
+                    <div id="cluster-process-group-name"></div>
+                    <div id="cluster-process-group-id"></div>
+                </div>
+            </div>
+        </div>
+        <div id="cluster-process-group-summary-table"></div>
+    </div>
+</div>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64132aff/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/summary/summary-content.jsp
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/summary/summary-content.jsp b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/summary/summary-content.jsp
index a419baa..5be3e2b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/summary/summary-content.jsp
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/summary/summary-content.jsp
@@ -51,6 +51,9 @@
         <div id="connection-summary-tab-content" class="configuration-tab">
             <div id="connection-summary-table" class="summary-table"></div>
         </div>
+        <div id="process-group-summary-tab-content" class="configuration-tab">
+            <div id="process-group-summary-table" class="summary-table"></div>
+        </div>
         <div id="input-port-summary-tab-content" class="configuration-tab">
             <div id="input-port-summary-table" class="summary-table"></div>
         </div>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64132aff/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/summary.css
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/summary.css b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/summary.css
index bb7b9c4..e882f89 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/summary.css
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/summary.css
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 /*
     Status Styles
 */
@@ -386,6 +387,12 @@ span.sorted {
     text-decoration: underline;
 }
 
+/* tooltips in the summary table */
+
+#summary .nifi-tooltip {
+    max-width: 500px;
+}
+
 /* cluster processor summary table */
 
 #cluster-processor-summary-dialog {
@@ -765,4 +772,80 @@ span.sorted {
     white-space: nowrap;
     overflow: hidden;
     width: 200px;
+}
+
+/* cluster process group summary table */
+
+#cluster-process-group-summary-dialog {
+    display: none;
+    width: 778px;
+    height: 450px;
+    z-index: 1301;
+}
+
+#cluster-process-group-summary-table {
+    width: 758px;
+    height: 300px;
+    border-bottom: 1px solid #666;
+}
+
+#cluster-process-group-summary-header {
+    height: 26px;
+    color: #666;
+    font-weight: normal;
+    margin-bottom: 1px;
+}
+
+#cluster-process-group-refresh-button {
+    height: 24px;
+    width: 26px;
+    float: left;
+}
+
+#cluster-process-group-summary-last-refreshed-container {
+    float: left;
+    margin-top: 6px;
+    margin-left: 3px;
+    -webkit-user-select: none;
+    -moz-user-select: none;
+}
+
+#cluster-process-group-summary-last-refreshed {
+    font-weight: bold;
+}
+
+#cluster-process-group-summary-loading-container {
+    float: left;
+    width: 16px;
+    height: 16px;
+    background-color: transparent;
+    margin-top: 4px;
+    margin-left: 3px;
+}
+
+#cluster-process-group-details-container {
+    position: absolute;
+    right: 35px;
+}
+
+#cluster-process-group-icon {
+    background-image: url(../images/iconProcessGroup.png);
+    width: 29px;
+    height: 20px;
+    float: left;
+    margin-right: 5px;
+    margin-top: 1px;
+}
+
+#cluster-process-group-details {
+    float: left;
+}
+
+#cluster-process-group-name {
+    margin-bottom: 2px;
+    color: #000;
+    font-weight: bold;
+    white-space: nowrap;
+    overflow: hidden;
+    width: 200px;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64132aff/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/images/iconProcessGroup.png
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/images/iconProcessGroup.png b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/images/iconProcessGroup.png
new file mode 100644
index 0000000..4ff5ac5
Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/images/iconProcessGroup.png differ

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/64132aff/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/summary/nf-summary-table.js
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/summary/nf-summary-table.js b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/summary/nf-summary-table.js
index 2bd94d5..75ef3f1 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/summary/nf-summary-table.js
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/summary/nf-summary-table.js
@@ -31,6 +31,7 @@ nf.SummaryTable = (function () {
             processGroups: '../nifi-api/controller/process-groups/',
             clusterProcessor: '../nifi-api/cluster/processors/',
             clusterConnection: '../nifi-api/cluster/connections/',
+            clusterProcessGroup: '../nifi-api/cluster/process-groups/',
             clusterInputPort: '../nifi-api/cluster/input-ports/',
             clusterOutputPort: '../nifi-api/cluster/output-ports/',
             clusterRemoteProcessGroup: '../nifi-api/cluster/remote-process-groups/',
@@ -130,6 +131,9 @@ nf.SummaryTable = (function () {
                 }, {
                     name: 'Connections',
                     tabContentId: 'connection-summary-tab-content'
+                }, {
+                    name: 'Process Groups',
+                    tabContentId: 'process-group-summary-tab-content'
                 }],
             select: function () {
                 var tab = $(this).text();
@@ -190,12 +194,12 @@ nf.SummaryTable = (function () {
                     if (nf.Common.isDefinedAndNotNull(inputPortsGrid)) {
                         inputPortsGrid.resizeCanvas();
 
-                        // update the total number of connections
+                        // update the total number of input ports
                         $('#displayed-items').text(nf.Common.formatInteger(inputPortsGrid.getData().getLength()));
                         $('#total-items').text(nf.Common.formatInteger(inputPortsGrid.getData().getLength()));
                     }
 
-                    // update the combo for connections
+                    // update the combo for input ports
                     $('#summary-filter-type').combo({
                         options: [{
                                 text: 'by name',
@@ -211,12 +215,12 @@ nf.SummaryTable = (function () {
                     if (nf.Common.isDefinedAndNotNull(outputPortsGrid)) {
                         outputPortsGrid.resizeCanvas();
 
-                        // update the total number of connections
+                        // update the total number of output ports
                         $('#displayed-items').text(nf.Common.formatInteger(outputPortsGrid.getData().getLength()));
                         $('#total-items').text(nf.Common.formatInteger(outputPortsGrid.getData().getLength()));
                     }
 
-                    // update the combo for connections
+                    // update the combo for output ports
                     $('#summary-filter-type').combo({
                         options: [{
                                 text: 'by name',
@@ -226,18 +230,18 @@ nf.SummaryTable = (function () {
                             applyFilter();
                         }
                     });
-                } else {
+                } else if (tab === 'Remote Process Groups') {
                     // ensure the connection table is size properly
                     var remoteProcessGroupsGrid = $('#remote-process-group-summary-table').data('gridInstance');
                     if (nf.Common.isDefinedAndNotNull(remoteProcessGroupsGrid)) {
                         remoteProcessGroupsGrid.resizeCanvas();
 
-                        // update the total number of connections
+                        // update the total number of remote process groups
                         $('#displayed-items').text(nf.Common.formatInteger(remoteProcessGroupsGrid.getData().getLength()));
                         $('#total-items').text(nf.Common.formatInteger(remoteProcessGroupsGrid.getData().getLength()));
                     }
 
-                    // update the combo for connections
+                    // update the combo for remote process groups
                     $('#summary-filter-type').combo({
                         options: [{
                                 text: 'by name',
@@ -250,6 +254,27 @@ nf.SummaryTable = (function () {
                             applyFilter();
                         }
                     });
+                } else {
+                    // ensure the connection table is size properly
+                    var processGroupGrid = $('#process-group-summary-table').data('gridInstance');
+                    if (nf.Common.isDefinedAndNotNull(processGroupGrid)) {
+                        processGroupGrid.resizeCanvas();
+
+                        // update the total number of process groups
+                        $('#displayed-items').text(nf.Common.formatInteger(processGroupGrid.getData().getLength()));
+                        $('#total-items').text(nf.Common.formatInteger(processGroupGrid.getData().getLength()));
+                    }
+
+                    // update the combo for process groups
+                    $('#summary-filter-type').combo({
+                        options: [{
+                                text: 'by name',
+                                value: 'name'
+                            }],
+                        select: function (option) {
+                            applyFilter();
+                        }
+                    });
                 }
 
                 // reset the filter
@@ -458,18 +483,18 @@ nf.SummaryTable = (function () {
 
                 // show the tooltip
                 if (nf.Common.isDefinedAndNotNull(tooltip)) {
-                    bulletinIcon.qtip($.extend({
+                    bulletinIcon.qtip($.extend({}, nf.Common.config.tooltipConfig, {
                         content: tooltip,
                         position: {
-                            target: 'mouse',
-                            viewport: $(window),
+                            container: $('#summary'),
+                            at: 'bottom right',
+                            my: 'top left',
                             adjust: {
-                                x: 8,
-                                y: 8,
-                                method: 'flipinvert flipinvert'
+                                x: 4,
+                                y: 4
                             }
                         }
-                    }, nf.Common.config.tooltipConfig));
+                    }));
                 }
             }
         });
@@ -783,7 +808,7 @@ nf.SummaryTable = (function () {
         // hold onto an instance of the grid
         $('#cluster-connection-summary-table').data('gridInstance', clusterConnectionsGrid);
 
-        // define a custom formatter for showing more port details
+        // define a custom formatter for showing more port/group details
         var moreDetails = function (row, cell, value, columnDef, dataContext) {
             var markup = '';
 
@@ -794,10 +819,260 @@ nf.SummaryTable = (function () {
 
             return markup;
         };
+        
+        var moreDetailsColumn = {id: 'moreDetails', field: 'moreDetails', name: '&nbsp;', resizable: false, formatter: moreDetails, sortable: true, width: 50, maxWidth: 50};
+        var transferredColumn = {id: 'transferred', field: 'transferred', name: '<span class="transferred-title">Transferred</span>&nbsp;/&nbsp;<span class="sent-size-title">Size</span>&nbsp;<span style="font-weight: normal; overflow: hidden;">5 min</span>', toolTip: 'Count / data size transferred to and from connections in the last 5 min', resizable: true, defaultSortAsc: false, sortable: true};
+        var sentColumn = {id: 'sent', field: 'sent', name: '<span class="sent-title">Sent</span>&nbsp;/&nbsp;<span class="sent-size-title">Size</span>&nbsp;<span style="font-weight: normal; overflow: hidden;">5 min</span>', toolTip: 'Count / data size in the last 5 min', sortable: true, defaultSortAsc: false, resizable: true};
+        var receivedColumn = {id: 'received', field: 'received', name: '<span class="received-title">Received</span>&nbsp;/&nbsp;<span class="received-size-title">Size</span>&nbsp;<span style="font-weight: normal; overflow: hidden;">5 min</span>', toolTip: 'Count / data size in the last 5 min', sortable: true, defaultSortAsc: false, resizable: true};
+
+        // define the column model for the summary table
+        var processGroupsColumnModel = [
+            moreDetailsColumn,
+            {id: 'name', field: 'name', name: 'Name', sortable: true, resizable: true, formatter: valueFormatter},
+            transferredColumn,
+            inputColumn,
+            ioColumn,
+            outputColumn,
+            sentColumn,
+            receivedColumn
+        ];
+        
+        // add an action column if appropriate
+        if (isClustered || isInShell || nf.Common.SUPPORTS_SVG) {
+            // define how the column is formatted
+            var processGroupActionFormatter = function (row, cell, value, columnDef, dataContext) {
+                var markup = '';
+
+                if (isInShell && dataContext.groupId !== null) {
+                    markup += '<img src="images/iconGoTo.png" title="Go To" class="pointer go-to" style="margin-top: 2px;"/>&nbsp;';
+                }
+
+                if (nf.Common.SUPPORTS_SVG) {
+                    if (isClustered) {
+                        markup += '<img src="images/iconChart.png" title="Show History" class="pointer show-cluster-process-group-status-history" style="margin-top: 2px;"/>&nbsp;';
+                    } else {
+                        markup += '<img src="images/iconChart.png" title="Show History" class="pointer show-process-group-status-history" style="margin-top: 2px;"/>&nbsp;';
+                    }
+                }
+
+                if (isClustered) {
+                    markup += '<img src="images/iconClusterSmall.png" title="Show Details" class="pointer show-cluster-process-group-summary" style="margin-top: 2px;"/>&nbsp;';
+                }
+
+                return markup;
+            };
+
+            // define the action column for clusters and within the shell
+            processGroupsColumnModel.push({id: 'actions', name: '&nbsp;', formatter: processGroupActionFormatter, resizable: false, sortable: false, width: 75, maxWidth: 75});
+        }
+
+        // initialize the templates table
+        var processGroupsOptions = {
+            forceFitColumns: true,
+            enableTextSelectionOnCells: true,
+            enableCellNavigation: true,
+            enableColumnReorder: false,
+            autoEdit: false,
+            multiSelect: false
+        };
+
+        // initialize the dataview
+        var processGroupsData = new Slick.Data.DataView({
+            inlineFilters: false
+        });
+        processGroupsData.setItems([]);
+        processGroupsData.setFilterArgs({
+            searchString: '',
+            property: 'name'
+        });
+        processGroupsData.setFilter(filter);
+
+        // initialize the sort
+        sort('process-group-summary-table', {
+            columnId: 'name',
+            sortAsc: true
+        }, processGroupsData);
+
+        // initialize the grid
+        var processGroupsGrid = new Slick.Grid('#process-group-summary-table', processGroupsData, processGroupsColumnModel, processGroupsOptions);
+        processGroupsGrid.setSelectionModel(new Slick.RowSelectionModel());
+        processGroupsGrid.registerPlugin(new Slick.AutoTooltips());
+        processGroupsGrid.setSortColumn('name', true);
+        processGroupsGrid.onSort.subscribe(function (e, args) {
+            sort('process-group-summary-table', {
+                columnId: args.sortCol.field,
+                sortAsc: args.sortAsc
+            }, processGroupsData);
+        });
+
+        // configure a click listener
+        processGroupsGrid.onClick.subscribe(function (e, args) {
+            var target = $(e.target);
+
+            // get the node at this row
+            var item = processGroupsData.getItem(args.row);
+
+            // determine the desired action
+            if (processGroupsGrid.getColumns()[args.cell].id === 'actions') {
+                if (target.hasClass('go-to')) {
+                    if (nf.Common.isDefinedAndNotNull(parent.nf) && nf.Common.isDefinedAndNotNull(parent.nf.CanvasUtils) && nf.Common.isDefinedAndNotNull(parent.nf.Shell)) {
+                        parent.nf.CanvasUtils.enterGroup(item.id);
+                        parent.$('#shell-close-button').click();
+                    }
+                } else if (target.hasClass('show-cluster-process-group-status-history')) {
+                    nf.StatusHistory.showClusterProcessGroupChart(item.groupId, item.id);
+                } else if (target.hasClass('show-process-group-status-history')) {
+                    nf.StatusHistory.showStandaloneProcessGroupChart(item.groupId, item.id);
+                } else if (target.hasClass('show-cluster-process-group-summary')) {
+                    // load the cluster processor summary
+                    loadClusterProcessGroupSummary(item.id);
+
+                    // hide the summary loading indicator
+                    $('#summary-loading-container').hide();
+
+                    // show the dialog
+                    $('#cluster-process-group-summary-dialog').modal('show');
+                }
+            }
+        });
+
+        // wire up the dataview to the grid
+        processGroupsData.onRowCountChanged.subscribe(function (e, args) {
+            processGroupsGrid.updateRowCount();
+            processGroupsGrid.render();
+
+            // update the total number of displayed process groups if necessary
+            if ($('#process-group-summary-table').is(':visible')) {
+                $('#displayed-items').text(nf.Common.formatInteger(args.current));
+            }
+        });
+        processGroupsData.onRowsChanged.subscribe(function (e, args) {
+            processGroupsGrid.invalidateRows(args.rows);
+            processGroupsGrid.render();
+        });
+
+        // hold onto an instance of the grid
+        $('#process-group-summary-table').data('gridInstance', processGroupsGrid).on('mouseenter', 'div.slick-cell', function (e) {
+            var bulletinIcon = $(this).find('img.has-bulletins');
+            if (bulletinIcon.length && !bulletinIcon.data('qtip')) {
+                var processGroupId = $(this).find('span.row-id').text();
+
+                // get the status item
+                var item = processGroupsData.getItemById(processGroupId);
+
+                // format the tooltip
+                var bulletins = nf.Common.getFormattedBulletins(item.bulletins);
+                var tooltip = nf.Common.formatUnorderedList(bulletins);
+
+                // show the tooltip
+                if (nf.Common.isDefinedAndNotNull(tooltip)) {
+                    bulletinIcon.qtip($.extend({}, nf.Common.config.tooltipConfig, {
+                        content: tooltip,
+                        position: {
+                            container: $('#summary'),
+                            at: 'bottom right',
+                            my: 'top left',
+                            adjust: {
+                                x: 4,
+                                y: 4
+                            }
+                        }
+                    }));
+                }
+            }
+        });
+
+        // initialize the cluster process group summary dialog
+        $('#cluster-process-group-summary-dialog').modal({
+            headerText: 'Cluster Process Group Summary',
+            overlayBackground: false,
+            buttons: [{
+                    buttonText: 'Close',
+                    handler: {
+                        click: function () {
+                            // clear the cluster processor summary dialog
+                            $('#cluster-process-group-id').text('');
+                            $('#cluster-process-group-name').text('');
+
+                            // close the dialog
+                            this.modal('hide');
+                        }
+                    }
+                }],
+            handler: {
+                close: function () {
+                    // show the summary loading container
+                    $('#summary-loading-container').show();
+                }
+            }
+        });
+        
+        // cluster process group refresh
+        nf.Common.addHoverEffect('#cluster-process-group-refresh-button', 'button-refresh', 'button-refresh-hover').click(function () {
+            loadClusterProcessGroupSummary($('#cluster-process-group-id').text());
+        });
+
+        // initialize the cluster process groups column model
+        var clusterProcessGroupsColumnModel = [
+            {id: 'node', field: 'node', name: 'Node', sortable: true, resizable: true},
+            transferredColumn,
+            inputColumn,
+            ioColumn,
+            outputColumn,
+            sentColumn,
+            receivedColumn
+        ];
+
+        // initialize the options for the cluster processors table
+        var clusterProcessGroupsOptions = {
+            forceFitColumns: true,
+            enableTextSelectionOnCells: true,
+            enableCellNavigation: true,
+            enableColumnReorder: false,
+            autoEdit: false,
+            multiSelect: false
+        };
+
+        // initialize the dataview
+        var clusterProcessGroupsData = new Slick.Data.DataView({
+            inlineFilters: false
+        });
+        clusterProcessGroupsData.setItems([]);
+
+        // initialize the sort
+        sort('cluster-processor-summary-table', {
+            columnId: 'node',
+            sortAsc: true
+        }, clusterProcessGroupsData);
+
+        // initialize the grid
+        var clusterProcessGroupsGrid = new Slick.Grid('#cluster-process-group-summary-table', clusterProcessGroupsData, clusterProcessGroupsColumnModel, clusterProcessGroupsOptions);
+        clusterProcessGroupsGrid.setSelectionModel(new Slick.RowSelectionModel());
+        clusterProcessGroupsGrid.registerPlugin(new Slick.AutoTooltips());
+        clusterProcessGroupsGrid.setSortColumn('node', true);
+        clusterProcessGroupsGrid.onSort.subscribe(function (e, args) {
+            sort('cluster-process-group-summary-table', {
+                columnId: args.sortCol.field,
+                sortAsc: args.sortAsc
+            }, clusterProcessGroupsData);
+        });
+
+        // wire up the dataview to the grid
+        clusterProcessGroupsData.onRowCountChanged.subscribe(function (e, args) {
+            clusterProcessGroupsGrid.updateRowCount();
+            clusterProcessGroupsGrid.render();
+        });
+        clusterProcessGroupsData.onRowsChanged.subscribe(function (e, args) {
+            clusterProcessGroupsGrid.invalidateRows(args.rows);
+            clusterProcessGroupsGrid.render();
+        });
+
+        // hold onto an instance of the grid
+        $('#cluster-process-group-summary-table').data('gridInstance', clusterProcessGroupsGrid);
 
         // define the column model for the summary table
         var inputPortsColumnModel = [
-            {id: 'moreDetails', field: 'moreDetails', name: '&nbsp;', resizable: false, formatter: moreDetails, sortable: true, width: 50, maxWidth: 50},
+            moreDetailsColumn,
             nameColumn,
             runStatusColumn,
             outputColumn
@@ -917,18 +1192,18 @@ nf.SummaryTable = (function () {
 
                 // show the tooltip
                 if (nf.Common.isDefinedAndNotNull(tooltip)) {
-                    bulletinIcon.qtip($.extend({
+                    bulletinIcon.qtip($.extend({}, nf.Common.config.tooltipConfig, {
                         content: tooltip,
                         position: {
-                            target: 'mouse',
-                            viewport: $(window),
+                            container: $('#summary'),
+                            at: 'bottom right',
+                            my: 'top left',
                             adjust: {
-                                x: 8,
-                                y: 8,
-                                method: 'flipinvert flipinvert'
+                                x: 4,
+                                y: 4
                             }
                         }
-                    }, nf.Common.config.tooltipConfig));
+                    }));
                 }
             }
         });
@@ -1019,7 +1294,7 @@ nf.SummaryTable = (function () {
 
         // define the column model for the summary table
         var outputPortsColumnModel = [
-            {id: 'moreDetails', field: 'moreDetails', name: '&nbsp;', resizable: false, formatter: moreDetails, sortable: true, width: 50, maxWidth: 50},
+            moreDetailsColumn,
             nameColumn,
             runStatusColumn,
             inputColumn
@@ -1139,18 +1414,18 @@ nf.SummaryTable = (function () {
 
                 // show the tooltip
                 if (nf.Common.isDefinedAndNotNull(tooltip)) {
-                    bulletinIcon.qtip($.extend({
+                    bulletinIcon.qtip($.extend({}, nf.Common.config.tooltipConfig, {
                         content: tooltip,
                         position: {
-                            target: 'mouse',
-                            viewport: $(window),
+                            container: $('#summary'),
+                            at: 'bottom right',
+                            my: 'top left',
                             adjust: {
-                                x: 8,
-                                y: 8,
-                                method: 'flipinvert flipinvert'
+                                x: 4,
+                                y: 4
                             }
                         }
-                    }, nf.Common.config.tooltipConfig));
+                    }));
                 }
             }
         });
@@ -1266,8 +1541,6 @@ nf.SummaryTable = (function () {
 
         var transmissionStatusColumn = {id: 'transmissionStatus', field: 'transmissionStatus', name: 'Transmitting', formatter: transmissionStatusFormatter, sortable: true, resizable: true};
         var targetUriColumn = {id: 'targetUri', field: 'targetUri', name: 'Target URI', sortable: true, resizable: true};
-        var sentColumn = {id: 'sent', field: 'sent', name: '<span class="sent-title">Sent</span>&nbsp;/&nbsp;<span class="sent-size-title">Size</span>&nbsp;<span style="font-weight: normal; overflow: hidden;">5 min</span>', toolTip: 'Count / data size in the last 5 min', sortable: true, defaultSortAsc: false, resizable: true};
-        var receivedColumn = {id: 'received', field: 'received', name: '<span class="received-title">Received</span>&nbsp;/&nbsp;<span class="received-size-title">Size</span>&nbsp;<span style="font-weight: normal; overflow: hidden;">5 min</span>', toolTip: 'Count / data size in the last 5 min', sortable: true, defaultSortAsc: false, resizable: true};
 
         // define the column model for the summary table
         var remoteProcessGroupsColumnModel = [
@@ -1405,18 +1678,18 @@ nf.SummaryTable = (function () {
 
                 // show the tooltip
                 if (nf.Common.isDefinedAndNotNull(tooltip)) {
-                    bulletinIcon.qtip($.extend({
+                    bulletinIcon.qtip($.extend({}, nf.Common.config.tooltipConfig, {
                         content: tooltip,
                         position: {
-                            target: 'mouse',
-                            viewport: $(window),
+                            container: $('#summary'),
+                            at: 'bottom right',
+                            my: 'top left',
                             adjust: {
-                                x: 8,
-                                y: 8,
-                                method: 'flipinvert flipinvert'
+                                x: 4,
+                                y: 4
                             }
                         }
-                    }, nf.Common.config.tooltipConfig));
+                    }));
                 }
             }
         });
@@ -1607,7 +1880,7 @@ nf.SummaryTable = (function () {
                     var bQueueSize = nf.Common.parseSize(b['queuedSize']);
                     return aQueueSize - bQueueSize;
                 }
-            } else if (sortDetails.columnId === 'sent' || sortDetails.columnId === 'received' || sortDetails.columnId === 'input' || sortDetails.columnId === 'output') {
+            } else if (sortDetails.columnId === 'sent' || sortDetails.columnId === 'received' || sortDetails.columnId === 'input' || sortDetails.columnId === 'output' || sortDetails.columnId === 'transferred') {
                 var aSplit = a[sortDetails.columnId].split(/ \/ /);
                 var bSplit = b[sortDetails.columnId].split(/ \/ /);
                 var mod = sortState[tableId].count % 4;
@@ -1809,12 +2082,13 @@ nf.SummaryTable = (function () {
      * 
      * @argument {array} processorItems                 The processor data
      * @argument {array} connectionItems                The connection data
+     * @argument {array} processGroupItems              The process group data
      * @argument {array} inputPortItems                 The input port data
      * @argument {array} outputPortItems                The input port data
      * @argument {array} remoteProcessGroupItems        The remote process group data
      * @argument {object} processGroupStatus            The process group status
      */
-    var populateProcessGroupStatus = function (processorItems, connectionItems, inputPortItems, outputPortItems, remoteProcessGroupItems, processGroupStatus) {
+    var populateProcessGroupStatus = function (processorItems, connectionItems, processGroupItems, inputPortItems, outputPortItems, remoteProcessGroupItems, processGroupStatus) {
         // add the processors to the summary grid
         $.each(processGroupStatus.processorStatus, function (i, procStatus) {
             processorItems.push(procStatus);
@@ -1839,10 +2113,13 @@ nf.SummaryTable = (function () {
         $.each(processGroupStatus.remoteProcessGroupStatus, function (i, rpgStatus) {
             remoteProcessGroupItems.push(rpgStatus);
         });
+        
+        // add the process group status as well
+        processGroupItems.push(processGroupStatus);
 
         // add any child group's status
         $.each(processGroupStatus.processGroupStatus, function (i, childProcessGroup) {
-            populateProcessGroupStatus(processorItems, connectionItems, inputPortItems, outputPortItems, remoteProcessGroupItems, childProcessGroup);
+            populateProcessGroupStatus(processorItems, connectionItems, processGroupItems, inputPortItems, outputPortItems, remoteProcessGroupItems, childProcessGroup);
         });
     };
 
@@ -1980,6 +2257,63 @@ nf.SummaryTable = (function () {
      * 
      * @argument {string} rowId     The row id
      */
+    var loadClusterProcessGroupSummary = function (rowId) {
+        // get the summary
+        $.ajax({
+            type: 'GET',
+            url: config.urls.clusterProcessGroup + encodeURIComponent(rowId) + '/status',
+            data: {
+                verbose: true
+            },
+            dataType: 'json'
+        }).done(function (response) {
+            if (nf.Common.isDefinedAndNotNull(response.clusterProcessGroupStatus)) {
+                var clusterProcessGroupStatus = response.clusterProcessGroupStatus;
+
+                var clusterProcessGroupsGrid = $('#cluster-process-group-summary-table').data('gridInstance');
+                var clusterProcessGroupsData = clusterProcessGroupsGrid.getData();
+
+                var clusterProcessGroups = [];
+
+                // populate the table
+                $.each(clusterProcessGroupStatus.nodeProcessGroupStatus, function (i, nodeProcessGroupStatus) {
+                    clusterProcessGroups.push({
+                        id: nodeProcessGroupStatus.node.nodeId,
+                        node: nodeProcessGroupStatus.node.address + ':' + nodeProcessGroupStatus.node.apiPort,
+                        activeThreadCount: nodeProcessGroupStatus.processGroupStatus.activeThreadCount,
+                        transferred: nodeProcessGroupStatus.processGroupStatus.transferred,
+                        input: nodeProcessGroupStatus.processGroupStatus.input,
+                        queued: nodeProcessGroupStatus.processGroupStatus.queued,
+                        queuedCount: nodeProcessGroupStatus.processGroupStatus.queuedCount,
+                        queuedSize: nodeProcessGroupStatus.processGroupStatus.queuedSize,
+                        output: nodeProcessGroupStatus.processGroupStatus.output,
+                        read: nodeProcessGroupStatus.processGroupStatus.read,
+                        written: nodeProcessGroupStatus.processGroupStatus.written,
+                        sent: nodeProcessGroupStatus.processGroupStatus.sent,
+                        received: nodeProcessGroupStatus.processGroupStatus.received
+                    });
+                });
+
+                // update the input ports
+                clusterProcessGroupsData.setItems(clusterProcessGroups);
+                clusterProcessGroupsData.reSort();
+                clusterProcessGroupsGrid.invalidate();
+
+                // populate the input port details
+                $('#cluster-process-group-name').text(clusterProcessGroupStatus.processGroupName).ellipsis();
+                $('#cluster-process-group-id').text(clusterProcessGroupStatus.processGroupId);
+
+                // update the stats last refreshed timestamp
+                $('#cluster-process-group-summary-last-refreshed').text(clusterProcessGroupStatus.statsLastRefreshed);
+            }
+        }).fail(nf.Common.handleAjaxError);
+    };
+
+    /**
+     * Loads the cluster input port details dialog for the specified processor.
+     * 
+     * @argument {string} rowId     The row id
+     */
     var loadClusterInputPortSummary = function (rowId) {
         // get the summary
         $.ajax({
@@ -2126,14 +2460,17 @@ nf.SummaryTable = (function () {
     };
 
     return {
+        
         /**
          * URL for loading system diagnostics.
          */
         systemDiagnosticsUrl: null,
+        
         /**
          * URL for loading the summary.
          */
         url: null,
+        
         /**
          * Initializes the status table.
          * 
@@ -2157,6 +2494,7 @@ nf.SummaryTable = (function () {
                 });
             }).promise();
         },
+        
         /**
          * Update the size of the grid based on its container's current size.
          */
@@ -2170,6 +2508,11 @@ nf.SummaryTable = (function () {
             if (nf.Common.isDefinedAndNotNull(connectionsGrid)) {
                 connectionsGrid.resizeCanvas();
             }
+            
+            var processGroupsGrid = $('#process-group-summary-table').data('gridInstance');
+            if (nf.Common.isDefinedAndNotNull(processGroupsGrid)) {
+                processGroupsGrid.resizeCanvas();
+            }
 
             var inputPortGrid = $('#input-port-summary-table').data('gridInstance');
             if (nf.Common.isDefinedAndNotNull(connectionsGrid)) {
@@ -2186,6 +2529,7 @@ nf.SummaryTable = (function () {
                 remoteProcessGroupGrid.resizeCanvas();
             }
         },
+        
         /**
          * Load the processor status table.
          */
@@ -2212,6 +2556,14 @@ nf.SummaryTable = (function () {
                     // get the connections grid/data (do not render bulletins)
                     var connectionsGrid = $('#connection-summary-table').data('gridInstance');
                     var connectionsData = connectionsGrid.getData();
+                    
+                    // remove any tooltips from the process group table
+                    var processGroupGridElement = $('#process-group-summary-table');
+                    nf.Common.cleanUpTooltips(processGroupGridElement, 'img.has-bulletins');
+
+                    // get the process group grid/data
+                    var processGroupGrid = processGroupGridElement.data('gridInstance');
+                    var processGroupData = processGroupGrid.getData();
 
                     // remove any tooltips from the input port table
                     var inputPortsGridElement = $('#input-port-summary-table');
@@ -2239,12 +2591,13 @@ nf.SummaryTable = (function () {
 
                     var processorItems = [];
                     var connectionItems = [];
+                    var processGroupItems = [];
                     var inputPortItems = [];
                     var outputPortItems = [];
                     var remoteProcessGroupItems = [];
 
                     // populate the tables
-                    populateProcessGroupStatus(processorItems, connectionItems, inputPortItems, outputPortItems, remoteProcessGroupItems, processGroupStatus);
+                    populateProcessGroupStatus(processorItems, connectionItems, processGroupItems, inputPortItems, outputPortItems, remoteProcessGroupItems, processGroupStatus);
 
                     // update the processors
                     processorsData.setItems(processorItems);
@@ -2255,6 +2608,11 @@ nf.SummaryTable = (function () {
                     connectionsData.setItems(connectionItems);
                     connectionsData.reSort();
                     connectionsGrid.invalidate();
+                    
+                    // update the process groups
+                    processGroupData.setItems(processGroupItems);
+                    processGroupData.reSort();
+                    processGroupGrid.invalidate();
 
                     // update the input ports
                     inputPortsData.setItems(inputPortItems);


[4/7] incubator-nifi git commit: NIFI-527: More performance improvements, including reusing IndexSearchers and IndexWriters

Posted by ma...@apache.org.
NIFI-527: More performance improvements, including reusing IndexSearchers and IndexWriters


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/9a9973af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/9a9973af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/9a9973af

Branch: refs/heads/improve-prov-performance
Commit: 9a9973af8dc4a14d4dc33f08f6084bb49f8856ce
Parents: a1027ae
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Apr 21 09:45:46 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Apr 21 09:45:46 2015 -0400

----------------------------------------------------------------------
 .../PersistentProvenanceRepository.java         | 52 +++++++++++--
 .../provenance/lucene/DeleteIndexAction.java    |  6 +-
 .../nifi/provenance/lucene/DocsReader.java      | 20 +++--
 .../nifi/provenance/lucene/IndexManager.java    | 78 ++++++++++++++++----
 .../nifi/provenance/lucene/IndexSearch.java     |  5 ++
 .../nifi/provenance/toc/StandardTocWriter.java  | 27 -------
 .../TestStandardRecordReaderWriter.java         |  9 +++
 .../provenance/toc/TestStandardTocWriter.java   | 22 ------
 8 files changed, 142 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a9973af/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index f5c595d..7bf7f8a 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -181,7 +181,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         this.maxPartitionMillis = configuration.getMaxEventFileLife(TimeUnit.MILLISECONDS);
         this.maxPartitionBytes = configuration.getMaxEventFileCapacity();
         this.indexConfig = new IndexConfiguration(configuration);
-        this.indexManager = new IndexManager(indexConfig);
+        this.indexManager = new IndexManager();
         this.alwaysSync = configuration.isAlwaysSync();
         this.rolloverCheckMillis = rolloverCheckMillis;
         
@@ -192,7 +192,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             indexingAction = null;
         }
 
-        scheduledExecService = Executors.newScheduledThreadPool(3);
+        scheduledExecService = Executors.newScheduledThreadPool(3, new NamedThreadFactory("Provenance Maintenance Thread"));
         queryExecService = Executors.newFixedThreadPool(configuration.getQueryThreadPoolSize(), new NamedThreadFactory("Provenance Query Thread"));
 
         // The number of rollover threads is a little bit arbitrary but comes from the idea that multiple storage directories generally
@@ -905,6 +905,21 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         }
     }
 
+    
+    private int getJournalCount() {
+    	// determine how many 'journals' we have in the journals directories
+        int journalFileCount = 0;
+        for ( final File storageDir : configuration.getStorageDirectories() ) {
+        	final File journalsDir = new File(storageDir, "journals");
+        	final File[] journalFiles = journalsDir.listFiles();
+        	if ( journalFiles != null ) {
+        		journalFileCount += journalFiles.length;
+        	}
+        }
+        
+        return journalFileCount;
+    }
+    
     /**
      * MUST be called with the write lock held
      *
@@ -936,6 +951,32 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             	logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), "."));
             }
 
+            int journalFileCount = getJournalCount();
+            final int journalCountThreshold = configuration.getJournalCount() * 5;
+            if ( journalFileCount > journalCountThreshold ) {
+            	logger.warn("The rate of the dataflow is exceeding the provenance recording rate. "
+            			+ "Slowing down flow to accomodate. Currently, there are {} journal files and "
+            			+ "threshold for blocking is {}", journalFileCount, journalCountThreshold);
+            	eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The rate of the dataflow is "
+            			+ "exceeding the provenance recording rate. Slowing down flow to accomodate");
+            	
+            	while (journalFileCount > journalCountThreshold) {
+            		try {
+            			Thread.sleep(1000L);
+            		} catch (final InterruptedException ie) {
+            		}
+            		
+                	logger.debug("Provenance Repository is still behind. Keeping flow slowed down "
+                			+ "to accomodate. Currently, there are {} journal files and "
+                			+ "threshold for blocking is {}", journalFileCount, journalCountThreshold);
+
+            		journalFileCount = getJournalCount();
+            	}
+            	
+            	logger.info("Provenance Repository has no caught up with rolling over journal files. Current number of "
+            			+ "journal files to be rolled over is {}", journalFileCount);
+            }
+            
             writers = createWriters(configuration, idGenerator.get());
             streamStartTime.set(System.currentTimeMillis());
             recordsWrittenSinceRollover.getAndSet(0);
@@ -1205,7 +1246,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
 
                 final IndexingAction indexingAction = new IndexingAction(this, indexConfig);
                 
-                final IndexWriter indexWriter = indexManager.borrowIndexWriter(writerFile);
+                final File indexingDirectory = indexConfig.getWritableIndexDirectory(writerFile);
+                final IndexWriter indexWriter = indexManager.borrowIndexWriter(indexingDirectory);
                 try {
 	                while (!recordToReaderMap.isEmpty()) {
 	                    final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
@@ -1240,7 +1282,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
 	                
 	                indexWriter.commit();
                 } finally {
-                	indexManager.returnIndexWriter(writerFile, indexWriter);
+                	indexManager.returnIndexWriter(indexingDirectory, indexWriter);
                 }
             }
         } finally {
@@ -1784,7 +1826,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                             query, indexDir, queryResult.getQueryTime(), queryResult.getTotalHitCount());
                 }
             } catch (final Throwable t) {
-                logger.error("Failed to query provenance repository due to {}", t.toString());
+                logger.error("Failed to query Provenance Repository Index {} due to {}", indexDir, t.toString());
                 if (logger.isDebugEnabled()) {
                     logger.error("", t);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a9973af/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
index e3bf4d6..7db04aa 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
@@ -50,12 +50,14 @@ public class DeleteIndexAction implements ExpirationAction {
         long maxEventId = -1L;
         try (final RecordReader reader = RecordReaders.newRecordReader(expiredFile, repository.getAllLogFiles())) {
         	maxEventId = reader.getMaxEventId();
+        } catch (final IOException ioe) {
+        	logger.warn("Failed to obtain max ID present in journal file {}", expiredFile.getAbsolutePath());
         }
 
         // remove the records from the index
         final List<File> indexDirs = indexConfiguration.getIndexDirectories(expiredFile);
         for (final File indexingDirectory : indexDirs) {
-            Term term = new Term(FieldNames.STORAGE_FILENAME, LuceneUtil.substringBefore(expiredFile.getName(), "."));
+            final Term term = new Term(FieldNames.STORAGE_FILENAME, LuceneUtil.substringBefore(expiredFile.getName(), "."));
 
             boolean deleteDir = false;
             final IndexWriter writer = indexManager.borrowIndexWriter(indexingDirectory);
@@ -71,8 +73,8 @@ public class DeleteIndexAction implements ExpirationAction {
 
             // we've confirmed that all documents have been removed. Delete the index directory.
             if (deleteDir) {
+            	indexManager.removeIndex(indexingDirectory);
                 indexConfiguration.removeIndexDirectory(indexingDirectory);
-                indexManager.removeIndex(indexingDirectory);
                 
                 deleteDirectory(indexingDirectory);
                 logger.info("Removed empty index directory {}", indexingDirectory);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a9973af/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
index 93978cd..5a77f42 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
@@ -23,6 +23,7 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
@@ -118,9 +119,15 @@ public class DocsReader {
 
         final long start = System.nanoTime();
         int logFileCount = 0;
+        
+        final Set<String> storageFilesToSkip = new HashSet<>();
+        
         try {
             for (final Document d : docs) {
                 final String storageFilename = d.getField(FieldNames.STORAGE_FILENAME).stringValue();
+                if ( storageFilesToSkip.contains(storageFilename) ) {
+                	continue;
+                }
                 
                 try {
                     if (reader != null && storageFilename.equals(lastStorageFilename)) {
@@ -135,20 +142,23 @@ public class DocsReader {
 
                         List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
                         if (potentialFiles.isEmpty()) {
-                            throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository");
+                            logger.warn("Could not find Provenance Log File with basename {} in the "
+                            		+ "Provenance Repository; assuming file has expired and continuing without it", storageFilename);
+                            storageFilesToSkip.add(storageFilename);
+                            continue;
                         }
 
                         if (potentialFiles.size() > 1) {
-                            throw new FileNotFoundException("Found multiple Provenance Log Files with basename " + storageFilename + " in the Provenance Repository");
+                            throw new FileNotFoundException("Found multiple Provenance Log Files with basename " + 
+                            		storageFilename + " in the Provenance Repository");
                         }
 
                         for (final File file : potentialFiles) {
-                            reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles);
-
                             try {
+                            	reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles);
                                	matchingRecords.add(getRecord(d, reader));
                             } catch (final IOException e) {
-                                throw new IOException("Failed to retrieve record from Provenance File " + file + " due to " + e, e);
+                                throw new IOException("Failed to retrieve record " + d + " from Provenance File " + file + " due to " + e, e);
                             }
                         }
                     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a9973af/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
index 19c5b75..0d93f3b 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
@@ -21,9 +21,11 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -35,25 +37,21 @@ import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
-import org.apache.nifi.provenance.IndexConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class IndexManager implements Closeable {
 	private static final Logger logger = LoggerFactory.getLogger(IndexManager.class);
-	private final IndexConfiguration indexConfig;
 	
 	private final Lock lock = new ReentrantLock();
 	private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
 	private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>();
 	
-	public IndexManager(final IndexConfiguration indexConfig) {
-		this.indexConfig = indexConfig;
-	}
 	
 	public void removeIndex(final File indexDirectory) {
 		final File absoluteFile = indexDirectory.getAbsoluteFile();
-
+		logger.info("Removing index {}", indexDirectory);
+		
 		lock.lock();
 		try {
 			final IndexWriterCount count = writerCounts.remove(absoluteFile);
@@ -86,15 +84,14 @@ public class IndexManager implements Closeable {
 		}
 	}
 	
-	public IndexWriter borrowIndexWriter(final File journalFile) throws IOException {
-		final File indexingDirectory = indexConfig.getWritableIndexDirectory(journalFile);
+	public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException {
 		final File absoluteFile = indexingDirectory.getAbsoluteFile();
-
+		logger.debug("Borrowing index writer for {}", indexingDirectory);
+		
 		lock.lock();
 		try {
 			IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
 			if ( writerCount == null ) {
-				
 				final List<Closeable> closeables = new ArrayList<>();
                 final Directory directory = FSDirectory.open(indexingDirectory);
                 closeables.add(directory);
@@ -108,6 +105,7 @@ public class IndexManager implements Closeable {
 
                     final IndexWriter indexWriter = new IndexWriter(directory, config);
                     writerCount = new IndexWriterCount(indexWriter, analyzer, directory, 1);
+                    logger.debug("Providing new index writer for {}", indexingDirectory);
                 } catch (final IOException ioe) {
                 	for ( final Closeable closeable : closeables ) {
                 		try {
@@ -122,6 +120,7 @@ public class IndexManager implements Closeable {
                 
                 writerCounts.put(absoluteFile, writerCount);
 			} else {
+				logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
 				writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
 						writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
 			}
@@ -132,9 +131,9 @@ public class IndexManager implements Closeable {
 		}
 	}
 	
-	public void returnIndexWriter(final File journalFile, final IndexWriter writer) {
-		final File indexingDirectory = indexConfig.getWritableIndexDirectory(journalFile);
+	public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) {
 		final File absoluteFile = indexingDirectory.getAbsoluteFile();
+		logger.debug("Returning Index Writer for {} to IndexManager", indexingDirectory);
 		
 		lock.lock();
 		try {
@@ -142,13 +141,15 @@ public class IndexManager implements Closeable {
 			
 			try {
 				if ( count == null ) {
-					logger.warn("Index Writer {} was returned to IndexManager for Jornal File {}, but this writer is not known. This could potentially lead to a resource leak", writer, journalFile);
+					logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. This could potentially lead to a resource leak", writer, indexingDirectory);
 					writer.close();
 				} else if ( count.getCount() <= 1 ) {
 					// we are finished with this writer.
+					logger.debug("Closing Index Writer for {}", indexingDirectory);
 					count.close();
 				} else {
 					// decrement the count.
+					logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1);
 					writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1));
 				}
 			} catch (final IOException ioe) {
@@ -165,6 +166,7 @@ public class IndexManager implements Closeable {
 	
 	public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
 		final File absoluteFile = indexDir.getAbsoluteFile();
+		logger.debug("Borrowing index searcher for {}", indexDir);
 		
 		lock.lock();
 		try {
@@ -174,9 +176,38 @@ public class IndexManager implements Closeable {
 				currentlyCached = new ArrayList<>();
 				activeSearchers.put(absoluteFile, currentlyCached);
 			} else {
-				for ( final ActiveIndexSearcher searcher : currentlyCached ) {
-					if ( searcher.isCache() ) {
-						return searcher.getSearcher();
+				// keep track of any searchers that have been closed so that we can remove them
+				// from our cache later.
+				final Set<ActiveIndexSearcher> expired = new HashSet<>();
+				
+				try {
+					for ( final ActiveIndexSearcher searcher : currentlyCached ) {
+						if ( searcher.isCache() ) {
+							final int refCount = searcher.getSearcher().getIndexReader().getRefCount();
+							if ( refCount <= 0 ) {
+								// if refCount == 0, then the reader has been closed, so we need to discard the
+								// searcher.
+								logger.debug("Reference count for cached Index Searcher {} is currently {}; "
+									+ "removing cached searcher", searcher.getSearcher().getIndexReader(), refCount);
+								expired.add(searcher);
+								continue;
+							}
+							
+							logger.debug("Providing previously cached index searcher for {}", indexDir);
+							return searcher.getSearcher();
+						}
+					}
+				} finally {
+					// if we have any expired index searchers, we need to close them and remove them
+					// from the cache so that we don't try to use them again later.
+					for ( final ActiveIndexSearcher searcher : expired ) {
+						try {
+							searcher.close();
+						} catch (final Exception e) {
+							logger.debug("Failed to close 'expired' IndexSearcher {}", searcher);
+						}
+						
+						currentlyCached.remove(searcher);
 					}
 				}
 			}
@@ -184,6 +215,7 @@ public class IndexManager implements Closeable {
 			IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
 			if ( writerCount == null ) {
 				final Directory directory = FSDirectory.open(absoluteFile);
+				logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir);
 				
 				try {
 					final DirectoryReader directoryReader = DirectoryReader.open(directory);
@@ -204,6 +236,9 @@ public class IndexManager implements Closeable {
 					throw e;
 				}
 			} else {
+				logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing "
+						+ "counter to {}", indexDir, writerCount.getCount() + 1);
+
 				// increment the writer count to ensure that it's kept open.
 				writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
 						writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
@@ -230,6 +265,7 @@ public class IndexManager implements Closeable {
 	
 	public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) {
 		final File absoluteFile = indexDirectory.getAbsoluteFile();
+		logger.debug("Returning index searcher for {} to IndexManager", indexDirectory);
 		
 		lock.lock();
 		try {
@@ -247,6 +283,7 @@ public class IndexManager implements Closeable {
 				if ( activeSearcher.getSearcher().equals(searcher) ) {
 					if ( activeSearcher.isCache() ) {
 						// the searcher is cached. Just leave it open.
+						logger.debug("Index searcher for {} is cached; leaving open", indexDirectory);
 						return;
 					} else {
 						// searcher is not cached. It was created from a writer, and we want
@@ -259,6 +296,9 @@ public class IndexManager implements Closeable {
 						if ( writerCount != null ) {
 							if ( writerCount.getCount() <= 1 ) {
 								try {
+									logger.debug("Index searcher for {} is not cached. Writer count is "
+											+ "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1);
+									
 									writerCount.close();
 								} catch (final IOException ioe) {
 									logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe);
@@ -267,6 +307,9 @@ public class IndexManager implements Closeable {
 									}
 								}
 							} else {
+								logger.debug("Index searcher for {} is not cached. Writer count is decremented "
+										+ "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1);
+								
 								writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
 									writerCount.getAnalyzer(), writerCount.getDirectory(), 
 									writerCount.getCount() - 1));
@@ -274,6 +317,7 @@ public class IndexManager implements Closeable {
 						}
 
 						try {
+							logger.debug("Closing Index Searcher for {}", indexDirectory);
 							activeSearcher.close();
 						} catch (final IOException ioe) {
 							logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
@@ -291,6 +335,8 @@ public class IndexManager implements Closeable {
 	
 	@Override
 	public void close() throws IOException {
+		logger.debug("Closing Index Manager");
+		
 		lock.lock();
 		try {
 			IOException ioe = null;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a9973af/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
index f6723cf..ec16a52 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
@@ -97,4 +97,9 @@ public class IndexSearch {
         }
     }
 
+    
+    @Override
+    public String toString() {
+    	return "IndexSearcher[" + indexDirectory + "]";
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a9973af/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
index 17f1a59..488f225 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
@@ -16,18 +16,12 @@
  */
 package org.apache.nifi.provenance.toc;
 
-import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.EOFException;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 
 import org.slf4j.Logger;
@@ -61,27 +55,6 @@ public class StandardTocWriter implements TocWriter {
      * @throws FileNotFoundException 
      */
     public StandardTocWriter(final File file, final boolean compressionFlag, final boolean alwaysSync) throws IOException {
-        if ( file.exists() ) {
-            // Check if the header actually exists. If so, throw FileAlreadyExistsException
-            // If no data is in the file, we will just overwrite it.
-            try (final InputStream fis = new FileInputStream(file);
-                 final InputStream bis = new BufferedInputStream(fis);
-                 final DataInputStream dis = new DataInputStream(bis)) {
-                dis.read();
-                dis.read();
-
-                // we always add the first offset when the writer is created so we allow this to exist.
-                dis.readLong();
-                final int nextByte = dis.read();
-                
-                if ( nextByte > -1 ) {
-                    throw new FileAlreadyExistsException(file.getAbsolutePath());
-                }
-            } catch (final EOFException eof) {
-                // no real data. overwrite file.
-            }
-        }
-        
         final File tocDir = file.getParentFile();
         if ( !tocDir.exists() ) {
         	Files.createDirectories(tocDir.toPath());

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a9973af/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
index 3b0693a..6f85b94 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
@@ -33,6 +33,7 @@ import org.apache.nifi.provenance.toc.StandardTocWriter;
 import org.apache.nifi.provenance.toc.TocReader;
 import org.apache.nifi.provenance.toc.TocUtil;
 import org.apache.nifi.provenance.toc.TocWriter;
+import org.apache.nifi.util.file.FileUtils;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -82,6 +83,8 @@ public class TestStandardRecordReaderWriter {
         	assertEquals("nifi://unit-test", recovered.getTransitUri());
         	assertNull(reader.nextRecord());
         }
+        
+        FileUtils.deleteFile(journalFile.getParentFile(), true);
 	}
 	
 	
@@ -108,6 +111,8 @@ public class TestStandardRecordReaderWriter {
         	assertEquals("nifi://unit-test", recovered.getTransitUri());
         	assertNull(reader.nextRecord());
         }
+        
+        FileUtils.deleteFile(journalFile.getParentFile(), true);
 	}
 	
 	
@@ -145,6 +150,8 @@ public class TestStandardRecordReaderWriter {
         	
         	assertNull(reader.nextRecord());
         }
+        
+        FileUtils.deleteFile(journalFile.getParentFile(), true);
 	}
 	
 	
@@ -176,5 +183,7 @@ public class TestStandardRecordReaderWriter {
         	
         	assertNull(reader.nextRecord());
         }
+        
+        FileUtils.deleteFile(journalFile.getParentFile(), true);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9a9973af/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java
index 267d053..70f55a2 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java
@@ -20,11 +20,9 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.FileAlreadyExistsException;
 import java.util.UUID;
 
 import org.apache.nifi.util.file.FileUtils;
-import org.junit.Assert;
 import org.junit.Test;
 
 public class TestStandardTocWriter {
@@ -41,24 +39,4 @@ public class TestStandardTocWriter {
         }
     }
     
-    @Test
-    public void testDoNotOverwriteNonEmptyFile() throws IOException {
-        final File tocFile = new File("target/" + UUID.randomUUID().toString() + ".toc");
-        try {
-            assertTrue( tocFile.createNewFile() );
-            
-            try (final StandardTocWriter writer = new StandardTocWriter(tocFile, false, false)) {
-                writer.addBlockOffset(0L);
-                writer.addBlockOffset(34L);
-            }
-            
-            try (final StandardTocWriter writer = new StandardTocWriter(tocFile, false, false)) {
-                Assert.fail("StandardTocWriter attempted to overwrite existing file");
-            } catch (final FileAlreadyExistsException faee) {
-                // expected
-            }
-        } finally {
-            FileUtils.deleteFile(tocFile, false);
-        }
-    }
 }


[6/7] incubator-nifi git commit: NIFI-527: Added unit test to verify backpressure

Posted by ma...@apache.org.
NIFI-527: Added unit test to verify backpressure


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/4dafd65a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/4dafd65a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/4dafd65a

Branch: refs/heads/improve-prov-performance
Commit: 4dafd65a2e9296af7a229026fdf99e28b9c53f37
Parents: 2f7b2ef
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Apr 21 11:11:58 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Apr 21 11:11:58 2015 -0400

----------------------------------------------------------------------
 .../PersistentProvenanceRepository.java         | 10 +--
 .../TestPersistentProvenanceRepository.java     | 66 ++++++++++++++++++++
 2 files changed, 72 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4dafd65a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 7bf7f8a..ab3c2df 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -608,8 +608,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
 
             indexManager.close();
             
-            for (final RecordWriter writer : writers) {
-                writer.close();
+            if ( writers != null ) {
+	            for (final RecordWriter writer : writers) {
+	                writer.close();
+	            }
             }
         } finally {
             writeLock.unlock();
@@ -905,8 +907,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         }
     }
 
-    
-    private int getJournalCount() {
+    // made protected for testing purposes
+    protected int getJournalCount() {
     	// determine how many 'journals' we have in the journals directories
         int journalFileCount = 0;
         for ( final File storageDir : configuration.getStorageDirectories() ) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4dafd65a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index 7822e5c..25a363f 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -32,6 +32,8 @@ import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.core.SimpleAnalyzer;
@@ -965,6 +967,70 @@ public class TestPersistentProvenanceRepository {
         assertEquals(0, storageDirFiles.length);
     }
     
+    
+    @Test
+    public void testBackPressure() throws IOException, InterruptedException {
+        final RepositoryConfiguration config = createConfiguration();
+        config.setMaxEventFileCapacity(1L);	// force rollover on each record.
+        config.setJournalCount(1);
+        
+        final AtomicInteger journalCountRef = new AtomicInteger(0);
+        
+    	repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
+    		@Override
+    		protected int getJournalCount() {
+    			return journalCountRef.get();
+    		}
+    	};
+        repo.initialize(getEventReporter());
+
+    	final Map<String, String> attributes = new HashMap<>();
+    	final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        attributes.put("uuid", UUID.randomUUID().toString());
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+
+        // ensure that we can register the events.
+        for (int i = 0; i < 10; i++) {
+            builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
+            attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
+            repo.registerEvent(builder.build());
+        }
+
+        // set number of journals to 6 so that we will block.
+        journalCountRef.set(6);
+
+        final AtomicLong threadNanos = new AtomicLong(0L);
+        final Thread t = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				final long start = System.nanoTime();
+		        builder.fromFlowFile(createFlowFile(13, 3000L, attributes));
+		        attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 13);
+		        repo.registerEvent(builder.build());
+		        threadNanos.set(System.nanoTime() - start);
+			}
+        });
+        t.start();
+
+        Thread.sleep(1500L);
+        
+        journalCountRef.set(1);
+        t.join();
+        
+        final int threadMillis = (int) TimeUnit.NANOSECONDS.toMillis(threadNanos.get());
+        assertTrue(threadMillis > 1200);	// use 1200 to account for the fact that the timing is not exact
+        
+        builder.fromFlowFile(createFlowFile(15, 3000L, attributes));
+        attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 15);
+        repo.registerEvent(builder.build());
+    }
+    
+    
     // TODO: test EOF on merge
     // TODO: Test journal with no records
 


[3/7] incubator-nifi git commit: NIFI-527: Refactored the serialization format of the persistent prov repo to use compression blocks and index them

Posted by ma...@apache.org.
NIFI-527: Refactored the serialization format of the persistent prov repo to use compression blocks and index them


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a1027aea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a1027aea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a1027aea

Branch: refs/heads/improve-prov-performance
Commit: a1027aeae51826ca22234e373ecd207c66e72ab7
Parents: ce5a654
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Apr 20 13:37:22 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Apr 20 13:37:22 2015 -0400

----------------------------------------------------------------------
 .../nifi/stream/io/ByteCountingInputStream.java |   5 +
 .../stream/io/ByteCountingOutputStream.java     |   6 +
 .../PersistentProvenanceRepository.java         | 501 +++++++++----------
 .../provenance/RepositoryConfiguration.java     |  14 +-
 .../nifi/provenance/StandardRecordReader.java   | 223 ++++++++-
 .../nifi/provenance/StandardRecordWriter.java   | 114 ++++-
 .../provenance/lucene/DeleteIndexAction.java    |  73 +--
 .../nifi/provenance/lucene/DocsReader.java      | 100 ++--
 .../nifi/provenance/lucene/FieldNames.java      |   1 +
 .../nifi/provenance/lucene/IndexManager.java    | 421 ++++++++++++++++
 .../nifi/provenance/lucene/IndexSearch.java     |  52 +-
 .../nifi/provenance/lucene/IndexingAction.java  | 183 ++++---
 .../nifi/provenance/lucene/LuceneUtil.java      |  26 +-
 .../provenance/serialization/RecordReader.java  |  67 +++
 .../provenance/serialization/RecordReaders.java | 108 ++--
 .../provenance/serialization/RecordWriter.java  |   6 +
 .../provenance/serialization/RecordWriters.java |  13 +-
 .../nifi/provenance/toc/StandardTocReader.java  | 108 ++++
 .../nifi/provenance/toc/StandardTocWriter.java  | 147 ++++++
 .../apache/nifi/provenance/toc/TocReader.java   |  58 +++
 .../org/apache/nifi/provenance/toc/TocUtil.java |  37 ++
 .../apache/nifi/provenance/toc/TocWriter.java   |  52 ++
 .../TestPersistentProvenanceRepository.java     | 109 ++--
 .../TestStandardRecordReaderWriter.java         | 180 +++++++
 .../org/apache/nifi/provenance/TestUtil.java    |  82 +++
 .../provenance/toc/TestStandardTocReader.java   |  91 ++++
 .../provenance/toc/TestStandardTocWriter.java   |  64 +++
 27 files changed, 2267 insertions(+), 574 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a1027aea/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
index 8294af3..d1ed023 100644
--- a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
+++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
@@ -31,6 +31,11 @@ public class ByteCountingInputStream extends InputStream {
         this.in = in;
     }
 
+    public ByteCountingInputStream(final InputStream in, final long initialOffset) {
+        this.in = in;
+        this.bytesSkipped = initialOffset;
+    }
+
     @Override
     public int read() throws IOException {
         final int fromSuper = in.read();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a1027aea/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
index d8e1a42..e71937e 100644
--- a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
+++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
@@ -27,6 +27,12 @@ public class ByteCountingOutputStream extends OutputStream {
     public ByteCountingOutputStream(final OutputStream out) {
         this.out = out;
     }
+    
+    public ByteCountingOutputStream(final OutputStream out, final long initialByteCount) {
+        this.out = out;
+        this.bytesWritten = initialByteCount;
+    }
+
 
     @Override
     public void write(int b) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a1027aea/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 0502cc7..f5c595d 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -21,7 +21,6 @@ import java.io.File;
 import java.io.FileFilter;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -58,6 +57,14 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Pattern;
 
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexNotFoundException;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.FSDirectory;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.provenance.expiration.ExpirationAction;
@@ -67,12 +74,11 @@ import org.apache.nifi.provenance.lineage.Lineage;
 import org.apache.nifi.provenance.lineage.LineageComputationType;
 import org.apache.nifi.provenance.lucene.DeleteIndexAction;
 import org.apache.nifi.provenance.lucene.FieldNames;
+import org.apache.nifi.provenance.lucene.IndexManager;
 import org.apache.nifi.provenance.lucene.IndexSearch;
 import org.apache.nifi.provenance.lucene.IndexingAction;
 import org.apache.nifi.provenance.lucene.LineageQuery;
 import org.apache.nifi.provenance.lucene.LuceneUtil;
-import org.apache.nifi.provenance.rollover.CompressionAction;
-import org.apache.nifi.provenance.rollover.RolloverAction;
 import org.apache.nifi.provenance.search.Query;
 import org.apache.nifi.provenance.search.QueryResult;
 import org.apache.nifi.provenance.search.QuerySubmission;
@@ -81,18 +87,12 @@ import org.apache.nifi.provenance.serialization.RecordReader;
 import org.apache.nifi.provenance.serialization.RecordReaders;
 import org.apache.nifi.provenance.serialization.RecordWriter;
 import org.apache.nifi.provenance.serialization.RecordWriters;
+import org.apache.nifi.provenance.toc.TocUtil;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.RingBuffer;
 import org.apache.nifi.util.StopWatch;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexNotFoundException;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.ScoreDoc;
-import org.apache.lucene.search.TopDocs;
-import org.apache.lucene.store.FSDirectory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -102,7 +102,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
     public static final String EVENT_CATEGORY = "Provenance Repository";
     private static final String FILE_EXTENSION = ".prov";
     private static final String TEMP_FILE_SUFFIX = ".prov.part";
-    public static final int SERIALIZATION_VERSION = 7;
+    public static final int SERIALIZATION_VERSION = 8;
     public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
     public static final Pattern INDEX_PATTERN = Pattern.compile("index-\\d+");
     public static final Pattern LOG_FILENAME_PATTERN = Pattern.compile("(\\d+).*\\.prov");
@@ -129,14 +129,14 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
     private final AtomicLong streamStartTime = new AtomicLong(System.currentTimeMillis());
     private final RepositoryConfiguration configuration;
     private final IndexConfiguration indexConfig;
+    private final IndexManager indexManager;
     private final boolean alwaysSync;
     private final int rolloverCheckMillis;
 
     private final ScheduledExecutorService scheduledExecService;
-    private final ExecutorService rolloverExecutor;
+    private final ScheduledExecutorService rolloverExecutor;
     private final ExecutorService queryExecService;
 
-    private final List<RolloverAction> rolloverActions = new ArrayList<>();
     private final List<ExpirationAction> expirationActions = new ArrayList<>();
 
     private final IndexingAction indexingAction;
@@ -181,21 +181,17 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         this.maxPartitionMillis = configuration.getMaxEventFileLife(TimeUnit.MILLISECONDS);
         this.maxPartitionBytes = configuration.getMaxEventFileCapacity();
         this.indexConfig = new IndexConfiguration(configuration);
+        this.indexManager = new IndexManager(indexConfig);
         this.alwaysSync = configuration.isAlwaysSync();
         this.rolloverCheckMillis = rolloverCheckMillis;
         
         final List<SearchableField> fields = configuration.getSearchableFields();
         if (fields != null && !fields.isEmpty()) {
             indexingAction = new IndexingAction(this, indexConfig);
-            rolloverActions.add(indexingAction);
         } else {
             indexingAction = null;
         }
 
-        if (configuration.isCompressOnRollover()) {
-            rolloverActions.add(new CompressionAction());
-        }
-
         scheduledExecService = Executors.newScheduledThreadPool(3);
         queryExecService = Executors.newFixedThreadPool(configuration.getQueryThreadPoolSize(), new NamedThreadFactory("Provenance Query Thread"));
 
@@ -204,69 +200,74 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         // disks efficiently. However, the rollover actions can be somewhat CPU intensive, so we double the number of threads in order
         // to account for that.
         final int numRolloverThreads = configuration.getStorageDirectories().size() * 2;
-        rolloverExecutor = Executors.newFixedThreadPool(numRolloverThreads, new NamedThreadFactory("Provenance Repository Rollover Thread"));
+        rolloverExecutor = Executors.newScheduledThreadPool(numRolloverThreads, new NamedThreadFactory("Provenance Repository Rollover Thread"));
     }
 
     @Override
     public void initialize(final EventReporter eventReporter) throws IOException {
-        if (initialized.getAndSet(true)) {
-            return;
-        }
-
-        this.eventReporter = eventReporter;
-
-        recover();
-
-        if (configuration.isAllowRollover()) {
-            writers = createWriters(configuration, idGenerator.get());
-        }
-
-        if (configuration.isAllowRollover()) {
-            scheduledExecService.scheduleWithFixedDelay(new Runnable() {
-                @Override
-                public void run() {
-                    // Check if we need to roll over
-                    if (needToRollover()) {
-                        // it appears that we do need to roll over. Obtain write lock so that we can do so, and then
-                        // confirm that we still need to.
-                        writeLock.lock();
-                        try {
-                            logger.debug("Obtained write lock to perform periodic rollover");
-
-                            if (needToRollover()) {
-                                try {
-                                    rollover(false);
-                                } catch (final Exception e) {
-                                    logger.error("Failed to roll over Provenance Event Log due to {}", e.toString());
-                                    logger.error("", e);
-                                }
-                            }
-                        } finally {
-                            writeLock.unlock();
-                        }
-                    }
-                }
-            }, rolloverCheckMillis, rolloverCheckMillis, TimeUnit.MILLISECONDS);
-
-            scheduledExecService.scheduleWithFixedDelay(new RemoveExpiredQueryResults(), 30L, 3L, TimeUnit.SECONDS);
-            scheduledExecService.scheduleWithFixedDelay(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        purgeOldEvents();
-                    } catch (final Exception e) {
-                        logger.error("Failed to purge old events from Provenance Repo due to {}", e.toString());
-                        if (logger.isDebugEnabled()) {
-                            logger.error("", e);
-                        }
-                        eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to purge old events from Provenance Repo due to " + e.toString());
-                    }
-                }
-            }, 1L, 1L, TimeUnit.MINUTES);
-
-            expirationActions.add(new DeleteIndexAction(this, indexConfig));
-            expirationActions.add(new FileRemovalAction());
-        }
+    	writeLock.lock();
+    	try {
+	        if (initialized.getAndSet(true)) {
+	            return;
+	        }
+	
+	        this.eventReporter = eventReporter;
+	
+	        recover();
+	
+	        if (configuration.isAllowRollover()) {
+	            writers = createWriters(configuration, idGenerator.get());
+	        }
+	
+	        if (configuration.isAllowRollover()) {
+	            scheduledExecService.scheduleWithFixedDelay(new Runnable() {
+	                @Override
+	                public void run() {
+	                    // Check if we need to roll over
+	                    if (needToRollover()) {
+	                        // it appears that we do need to roll over. Obtain write lock so that we can do so, and then
+	                        // confirm that we still need to.
+	                        writeLock.lock();
+	                        try {
+	                            logger.debug("Obtained write lock to perform periodic rollover");
+	
+	                            if (needToRollover()) {
+	                                try {
+	                                    rollover(false);
+	                                } catch (final Exception e) {
+	                                    logger.error("Failed to roll over Provenance Event Log due to {}", e.toString());
+	                                    logger.error("", e);
+	                                }
+	                            }
+	                        } finally {
+	                            writeLock.unlock();
+	                        }
+	                    }
+	                }
+	            }, rolloverCheckMillis, rolloverCheckMillis, TimeUnit.MILLISECONDS);
+	
+	            scheduledExecService.scheduleWithFixedDelay(new RemoveExpiredQueryResults(), 30L, 3L, TimeUnit.SECONDS);
+	            scheduledExecService.scheduleWithFixedDelay(new Runnable() {
+	                @Override
+	                public void run() {
+	                    try {
+	                        purgeOldEvents();
+	                    } catch (final Exception e) {
+	                        logger.error("Failed to purge old events from Provenance Repo due to {}", e.toString());
+	                        if (logger.isDebugEnabled()) {
+	                            logger.error("", e);
+	                        }
+	                        eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to purge old events from Provenance Repo due to " + e.toString());
+	                    }
+	                }
+	            }, 1L, 1L, TimeUnit.MINUTES);
+	
+	            expirationActions.add(new DeleteIndexAction(this, indexConfig, indexManager));
+	            expirationActions.add(new FileRemovalAction());
+	        }
+    	} finally {
+    		writeLock.unlock();
+    	}
     }
 
     private static RepositoryConfiguration createRepositoryConfiguration() throws IOException {
@@ -334,10 +335,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             final File journalDirectory = new File(storageDirectory, "journals");
             final File journalFile = new File(journalDirectory, String.valueOf(initialRecordId) + ".journal." + i);
 
-            writers[i] = RecordWriters.newRecordWriter(journalFile);
+            writers[i] = RecordWriters.newRecordWriter(journalFile, false, false);
             writers[i].writeHeader();
         }
 
+        logger.info("Created new Provenance Event Writers for events starting with ID {}", initialRecordId);
         return writers;
     }
 
@@ -568,16 +570,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             // Read the records in the last file to find its max id
             if (greatestMinIdFile != null) {
                 try (final RecordReader recordReader = RecordReaders.newRecordReader(greatestMinIdFile, Collections.<Path>emptyList())) {
-                    StandardProvenanceEventRecord record;
-
-                    try {
-                        while ((record = recordReader.nextRecord()) != null) {
-                            if (record.getEventId() > maxId) {
-                                maxId = record.getEventId();
-                            }
-                        }
-                    } catch (final EOFException eof) {
-                    }
+                	maxId = recordReader.getMaxEventId();
                 }
             }
 
@@ -599,46 +592,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         }
 
         logger.info("Recovered {} records", recordsRecovered);
-
-        final List<RolloverAction> rolloverActions = this.rolloverActions;
-        final Runnable retroactiveRollover = new Runnable() {
-            @Override
-            public void run() {
-                for (File toRecover : filesToRecover) {
-                    final String baseFileName = LuceneUtil.substringBefore(toRecover.getName(), ".");
-                    final Long fileFirstEventId = Long.parseLong(baseFileName);
-
-                    for (final RolloverAction action : rolloverActions) {
-                        if (!action.hasBeenPerformed(toRecover)) {
-                            try {
-                                final StopWatch stopWatch = new StopWatch(true);
-
-                                toRecover = action.execute(toRecover);
-
-                                stopWatch.stop();
-                                final String duration = stopWatch.getDuration();
-                                logger.info("Successfully performed retroactive action {} against {} in {}", action, toRecover, duration);
-
-                                // update our map of id to Path
-                                final Map<Long, Path> updatedMap = addToPathMap(fileFirstEventId, toRecover.toPath());
-                                logger.trace("After retroactive rollover action {}, Path Map: {}", action, updatedMap);
-                            } catch (final Exception e) {
-                                logger.error("Failed to perform retroactive rollover actions on {} due to {}", toRecover, e.toString());
-                                logger.error("", e);
-                                eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to perform retroactive rollover actions on " + toRecover + " due to " + e.toString());
-                            }
-                        }
-                    }
-                }
-            }
-        };
-        rolloverExecutor.submit(retroactiveRollover);
-
         recoveryFinished.set(true);
     }
 
     @Override
-    public void close() throws IOException {
+    public synchronized void close() throws IOException {
         writeLock.lock();
         try {
             logger.debug("Obtained write lock for close");
@@ -648,6 +606,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             rolloverExecutor.shutdownNow();
             queryExecService.shutdownNow();
 
+            indexManager.close();
+            
             for (final RecordWriter writer : writers) {
                 writer.close();
             }
@@ -963,7 +923,17 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             for (final RecordWriter writer : writers) {
                 final File writerFile = writer.getFile();
                 journalsToMerge.add(writerFile);
-                writer.close();
+                try {
+                	writer.close();
+                } catch (final IOException ioe) {
+                	logger.warn("Failed to close {} due to {}", writer, ioe.toString());
+                	if ( logger.isDebugEnabled() ) {
+                		logger.warn("", ioe);
+                	}
+                }
+            }
+            if ( logger.isDebugEnabled() ) {
+            	logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), "."));
             }
 
             writers = createWriters(configuration, idGenerator.get());
@@ -974,60 +944,29 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             final List<File> storageDirs = configuration.getStorageDirectories();
             final File storageDir = storageDirs.get((int) (storageDirIdx % storageDirs.size()));
 
-            final List<RolloverAction> actions = rolloverActions;
+            final AtomicReference<Future<?>> futureReference = new AtomicReference<>();
             final int recordsWritten = recordsWrittenSinceRollover.getAndSet(0);
             final Runnable rolloverRunnable = new Runnable() {
                 @Override
                 public void run() {
-                    final File fileRolledOver;
-
-                    try {
-                        fileRolledOver = mergeJournals(journalsToMerge, storageDir, getMergeFile(journalsToMerge, storageDir), eventReporter, latestRecords);
-                        repoDirty.set(false);
-                    } catch (final IOException ioe) {
-                        repoDirty.set(true);
-                        logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
-                        logger.error("", ioe);
-                        return;
-                    }
-
-                    if (fileRolledOver == null) {
-                        return;
-                    }
-                    File file = fileRolledOver;
-
-                    for (final RolloverAction action : actions) {
-                        try {
-                            final StopWatch stopWatch = new StopWatch(true);
-                            file = action.execute(file);
-                            stopWatch.stop();
-                            logger.info("Successfully performed Rollover Action {} for {} in {}", action, file, stopWatch.getDuration());
-
-                            // update our map of id to Path
-                            // need lock to update the map, even though it's an AtomicReference, AtomicReference allows those doing a
-                            // get() to obtain the most up-to-date version but we use a writeLock to prevent multiple threads modifying
-                            // it at one time
-                            writeLock.lock();
-                            try {
-                                final Long fileFirstEventId = Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), "."));
-                                SortedMap<Long, Path> newIdToPathMap = new TreeMap<>(new PathMapComparator());
-                                newIdToPathMap.putAll(idToPathMap.get());
-                                newIdToPathMap.put(fileFirstEventId, file.toPath());
-                                idToPathMap.set(newIdToPathMap);
-                                logger.trace("After rollover action {}, path map: {}", action, newIdToPathMap);
-                            } finally {
-                                writeLock.unlock();
-                            }
-                        } catch (final Throwable t) {
-                            logger.error("Failed to perform Rollover Action {} for {}: got Exception {}",
-                                    action, fileRolledOver, t.toString());
-                            logger.error("", t);
-
-                            return;
-                        }
-                    }
-
-                    if (actions.isEmpty()) {
+                	try {
+	                    final File fileRolledOver;
+	
+	                    try {
+	                        fileRolledOver = mergeJournals(journalsToMerge, storageDir, getMergeFile(journalsToMerge, storageDir), eventReporter, latestRecords);
+	                        repoDirty.set(false);
+	                    } catch (final IOException ioe) {
+	                        repoDirty.set(true);
+	                        logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
+	                        logger.error("", ioe);
+	                        return;
+	                    }
+	
+	                    if (fileRolledOver == null) {
+	                        return;
+	                    }
+	                    File file = fileRolledOver;
+	
                         // update our map of id to Path
                         // need lock to update the map, even though it's an AtomicReference, AtomicReference allows those doing a
                         // get() to obtain the most up-to-date version but we use a writeLock to prevent multiple threads modifying
@@ -1042,35 +981,37 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                         } finally {
                             writeLock.unlock();
                         }
-                    }
-
-                    logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten);
-                    rolloverCompletions.getAndIncrement();
+	
+	                    logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten);
+	                    rolloverCompletions.getAndIncrement();
+	                    
+	                    // We have finished successfully. Cancel the future so that we don't run anymore
+	                    Future<?> future;
+	                    while ((future = futureReference.get()) == null) {
+	                    	try {
+	                    		Thread.sleep(10L);
+	                    	} catch (final InterruptedException ie) {
+	                    	}
+	                    }
+	                    
+	                    future.cancel(false);
+	                } catch (final Throwable t) {
+	                	logger.error("Failed to rollover Provenance repository due to {}", t.toString());
+	                	logger.error("", t);
+	                }
                 }
             };
 
-            rolloverExecutor.submit(rolloverRunnable);
+            // We are going to schedule the future to run every 10 seconds. This allows us to keep retrying if we
+            // fail for some reason. When we succeed, the Runnable will cancel itself.
+            final Future<?> future = rolloverExecutor.scheduleWithFixedDelay(rolloverRunnable, 0, 10, TimeUnit.SECONDS);
+            futureReference.set(future);
 
             streamStartTime.set(System.currentTimeMillis());
             bytesWrittenSinceRollover.set(0);
         }
     }
 
-    private SortedMap<Long, Path> addToPathMap(final Long firstEventId, final Path path) {
-        SortedMap<Long, Path> unmodifiableMap;
-        boolean updated = false;
-        do {
-            final SortedMap<Long, Path> existingMap = idToPathMap.get();
-            final SortedMap<Long, Path> newIdToPathMap = new TreeMap<>(new PathMapComparator());
-            newIdToPathMap.putAll(existingMap);
-            newIdToPathMap.put(firstEventId, path);
-            unmodifiableMap = Collections.unmodifiableSortedMap(newIdToPathMap);
-
-            updated = idToPathMap.compareAndSet(existingMap, unmodifiableMap);
-        } while (!updated);
-
-        return unmodifiableMap;
-    }
 
     private Set<File> recoverJournalFiles() throws IOException {
         if (!configuration.isAllowRollover()) {
@@ -1093,6 +1034,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             }
 
             for (final File journalFile : journalFiles) {
+            	if ( journalFile.isDirectory() ) {
+            		continue;
+            	}
+            	
                 final String basename = LuceneUtil.substringBefore(journalFile.getName(), ".");
                 List<File> files = journalMap.get(basename);
                 if (files == null) {
@@ -1135,22 +1080,74 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         return mergedFile;
     }
 
-    static File mergeJournals(final List<File> journalFiles, final File storageDir, final File mergedFile, final EventReporter eventReporter, final RingBuffer<ProvenanceEventRecord> ringBuffer) throws IOException {
-        final long startNanos = System.nanoTime();
+    File mergeJournals(final List<File> journalFiles, final File storageDir, final File mergedFile, final EventReporter eventReporter, final RingBuffer<ProvenanceEventRecord> ringBuffer) throws IOException {
+    	logger.debug("Merging {} to {}", journalFiles, mergedFile);
+    	if ( this.closed ) {
+    		logger.info("Provenance Repository has been closed; will not merge journal files to {}", mergedFile);
+    		return null;
+    	}
+    	
         if (journalFiles.isEmpty()) {
             return null;
         }
 
-        if (mergedFile.exists()) {
-            throw new FileAlreadyExistsException("Cannot Merge " + journalFiles.size() + " Journal Files into Merged Provenance Log File " + mergedFile.getAbsolutePath() + " because the Merged File already exists");
+        Collections.sort(journalFiles, new Comparator<File>() {
+			@Override
+			public int compare(final File o1, final File o2) {
+				final String suffix1 = LuceneUtil.substringAfterLast(o1.getName(), ".");
+				final String suffix2 = LuceneUtil.substringAfterLast(o2.getName(), ".");
+
+				try {
+					final int journalIndex1 = Integer.parseInt(suffix1);
+					final int journalIndex2 = Integer.parseInt(suffix2);
+					return Integer.compare(journalIndex1, journalIndex2);
+				} catch (final NumberFormatException nfe) {
+					return o1.getName().compareTo(o2.getName());
+				}
+			}
+        });
+        
+        final String firstJournalFile = journalFiles.get(0).getName();
+        final String firstFileSuffix = LuceneUtil.substringAfterLast(firstJournalFile, ".");
+        if (!firstFileSuffix.equals("0")) {
+        	if ( mergedFile.exists() ) {
+        		logger.warn("Merged Journal File {} already exists; however, all partial journal files also exist "
+        				+ "so assuming that the merge did not finish. Repeating procedure in order to ensure consistency.");
+        		
+        		final DeleteIndexAction deleteAction = new DeleteIndexAction(this, indexConfig, indexManager);
+        		try {
+        			deleteAction.execute(mergedFile);
+        		} catch (final Exception e) {
+        			logger.warn("Failed to delete records from Journal File {} from the index; this could potentially result in duplicates. Failure was due to {}", mergedFile, e.toString());
+        			if ( logger.isDebugEnabled() ) {
+        				logger.warn("", e);
+        			}
+        		}
+        		
+        	} else {
+	        	logger.warn("Cannot merge journal files {} because expected first file to end with extension '.0' "
+	        			+ "but it did not; assuming that the files were already merged but only some finished deletion "
+	        			+ "before restart. Deleting remaining journal files.", journalFiles);
+	        	
+	        	for ( final File file : journalFiles ) {
+	        		if ( !file.delete() && file.exists() ) {
+	        			logger.warn("Failed to delete unneeded journal file {}; this file should be cleaned up manually", file);
+	        		}
+	        	}
+	        	
+	        	return null;
+        	}
         }
-
-        final File tempMergedFile = new File(mergedFile.getParentFile(), mergedFile.getName() + ".part");
+        
+        final long startNanos = System.nanoTime();
 
         // Map each journal to a RecordReader
         final List<RecordReader> readers = new ArrayList<>();
         int records = 0;
 
+        final boolean isCompress = configuration.isCompressOnRollover();
+        final File writerFile = isCompress ? new File(mergedFile.getParentFile(), mergedFile.getName() + ".gz") : mergedFile;
+
         try {
             for (final File journalFile : journalFiles) {
                 try {
@@ -1203,32 +1200,47 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
 
             // loop over each entry in the map, persisting the records to the merged file in order, and populating the map
             // with the next entry from the journal file from which the previous record was written.
-            try (final RecordWriter writer = RecordWriters.newRecordWriter(tempMergedFile)) {
+            try (final RecordWriter writer = RecordWriters.newRecordWriter(writerFile, configuration.isCompressOnRollover(), true)) {
                 writer.writeHeader();
 
-                while (!recordToReaderMap.isEmpty()) {
-                    final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
-                    final StandardProvenanceEventRecord record = entry.getKey();
-                    final RecordReader reader = entry.getValue();
-
-                    writer.writeRecord(record, record.getEventId());
-                    ringBuffer.add(record);
-                    records++;
-
-                    // Remove this entry from the map
-                    recordToReaderMap.remove(record);
-
-                    // Get the next entry from this reader and add it to the map
-                    StandardProvenanceEventRecord nextRecord = null;
-
-                    try {
-                        nextRecord = reader.nextRecord();
-                    } catch (final EOFException eof) {
-                    }
-
-                    if (nextRecord != null) {
-                        recordToReaderMap.put(nextRecord, reader);
-                    }
+                final IndexingAction indexingAction = new IndexingAction(this, indexConfig);
+                
+                final IndexWriter indexWriter = indexManager.borrowIndexWriter(writerFile);
+                try {
+	                while (!recordToReaderMap.isEmpty()) {
+	                    final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
+	                    final StandardProvenanceEventRecord record = entry.getKey();
+	                    final RecordReader reader = entry.getValue();
+	
+	                    writer.writeRecord(record, record.getEventId());
+	                    final int blockIndex = writer.getTocWriter().getCurrentBlockIndex();
+	                    
+	                    indexingAction.index(record, indexWriter, blockIndex);
+	                    indexConfig.setMaxIdIndexed(record.getEventId());
+	                    
+	                    ringBuffer.add(record);
+	                    
+	                    records++;
+	
+	                    // Remove this entry from the map
+	                    recordToReaderMap.remove(record);
+	
+	                    // Get the next entry from this reader and add it to the map
+	                    StandardProvenanceEventRecord nextRecord = null;
+	
+	                    try {
+	                        nextRecord = reader.nextRecord();
+	                    } catch (final EOFException eof) {
+	                    }
+	
+	                    if (nextRecord != null) {
+	                        recordToReaderMap.put(nextRecord, reader);
+	                    }
+	                }
+	                
+	                indexWriter.commit();
+                } finally {
+                	indexManager.returnIndexWriter(writerFile, indexWriter);
                 }
             }
         } finally {
@@ -1240,37 +1252,22 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             }
         }
 
-        // Attempt to rename. Keep trying for a bit if we fail. This happens often if we have some external process
-        // that locks files, such as a virus scanner.
-        boolean renamed = false;
-        for (int i = 0; i < 10 && !renamed; i++) {
-            renamed = tempMergedFile.renameTo(mergedFile);
-            if (!renamed) {
-                try {
-                    Thread.sleep(100L);
-                } catch (final InterruptedException ie) {
-                }
-            }
-        }
-
-        if (!renamed) {
-            throw new IOException("Failed to merge journal files into single merged file " + mergedFile.getAbsolutePath() + " because " + tempMergedFile.getAbsolutePath() + " could not be renamed");
-        }
-
         // Success. Remove all of the journal files, as they're no longer needed, now that they've been merged.
         for (final File journalFile : journalFiles) {
-            if (!journalFile.delete()) {
-                if (journalFile.exists()) {
-                    logger.warn("Failed to remove temporary journal file {}; this file should be cleaned up manually", journalFile.getAbsolutePath());
-                    eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal file " + journalFile.getAbsolutePath() + "; this file should be cleaned up manually");
-                } else {
-                    logger.warn("Failed to remove temporary journal file {} because it no longer exists", journalFile.getAbsolutePath());
-                }
+            if (!journalFile.delete() && journalFile.exists()) {
+                logger.warn("Failed to remove temporary journal file {}; this file should be cleaned up manually", journalFile.getAbsolutePath());
+                eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal file " + journalFile.getAbsolutePath() + "; this file should be cleaned up manually");
+            }
+            
+            final File tocFile = TocUtil.getTocFile(journalFile);
+            if (!tocFile.delete() && tocFile.exists()) {
+                logger.warn("Failed to remove temporary journal TOC file {}; this file should be cleaned up manually", tocFile.getAbsolutePath());
+                eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal TOC file " + tocFile.getAbsolutePath() + "; this file should be cleaned up manually");
             }
         }
 
         if (records == 0) {
-            mergedFile.delete();
+            writerFile.delete();
             return null;
         } else {
             final long nanos = System.nanoTime() - startNanos;
@@ -1278,7 +1275,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             logger.info("Successfully merged {} journal files ({} records) into single Provenance Log File {} in {} milliseconds", journalFiles.size(), records, mergedFile, millis);
         }
 
-        return mergedFile;
+        return writerFile;
     }
 
     @Override
@@ -1779,7 +1776,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         @Override
         public void run() {
             try {
-                final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir);
+                final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir, indexManager);
                 final StandardQueryResult queryResult = search.search(query, retrievalCount);
                 submission.getResult().update(queryResult.getMatchingEvents(), queryResult.getTotalHitCount());
                 if (queryResult.isFinished()) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a1027aea/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
index d47df4f..3951591 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
@@ -33,7 +33,8 @@ public class RepositoryConfiguration {
     private long eventFileBytes = 1024L * 1024L * 5L;   // 5 MB
     private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB
     private int journalCount = 16;
-
+    private int compressionBlockBytes = 1024 * 1024;
+    
     private List<SearchableField> searchableFields = new ArrayList<>();
     private List<SearchableField> searchableAttributes = new ArrayList<>();
     private boolean compress = true;
@@ -49,7 +50,16 @@ public class RepositoryConfiguration {
         return allowRollover;
     }
 
-    /**
+    
+    public int getCompressionBlockBytes() {
+		return compressionBlockBytes;
+	}
+
+	public void setCompressionBlockBytes(int compressionBlockBytes) {
+		this.compressionBlockBytes = compressionBlockBytes;
+	}
+
+	/**
      * Specifies where the repository will store data
      *
      * @return

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a1027aea/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
index 5e4744b..9bbf195 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
@@ -17,41 +17,173 @@
 package org.apache.nifi.provenance;
 
 import java.io.DataInputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.zip.GZIPInputStream;
 
+import org.apache.nifi.provenance.serialization.RecordReader;
+import org.apache.nifi.provenance.toc.TocReader;
+import org.apache.nifi.stream.io.BufferedInputStream;
 import org.apache.nifi.stream.io.ByteCountingInputStream;
+import org.apache.nifi.stream.io.LimitingInputStream;
 import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.provenance.serialization.RecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class StandardRecordReader implements RecordReader {
-
-    private final DataInputStream dis;
-    private final ByteCountingInputStream byteCountingIn;
+	private static final Logger logger = LoggerFactory.getLogger(StandardRecordReader.class);
+	
+	private final ByteCountingInputStream rawInputStream;
     private final String filename;
     private final int serializationVersion;
+    private final boolean compressed;
+    private final TocReader tocReader;
+    private final int headerLength;
+    
+    private DataInputStream dis;
+    private ByteCountingInputStream byteCountingIn;
+
+    public StandardRecordReader(final InputStream in, final String filename) throws IOException {
+    	this(in, filename, null);
+    }
+    
+    public StandardRecordReader(final InputStream in, final String filename, final TocReader tocReader) throws IOException {
+    	logger.trace("Creating RecordReader for {}", filename);
+    	
+    	rawInputStream = new ByteCountingInputStream(in);
+
+        final InputStream limitedStream;
+        if ( tocReader == null ) {
+        	limitedStream = rawInputStream;
+        } else {
+        	final long offset1 = tocReader.getBlockOffset(1);
+        	if ( offset1 < 0 ) {
+        		limitedStream = rawInputStream;
+        	} else {
+        		limitedStream = new LimitingInputStream(rawInputStream, offset1 - rawInputStream.getBytesConsumed());
+        	}
+        }
+        
+    	final InputStream readableStream;
+        if (filename.endsWith(".gz")) {
+            readableStream = new BufferedInputStream(new GZIPInputStream(limitedStream));
+            compressed = true;
+        } else {
+            readableStream = new BufferedInputStream(limitedStream);
+            compressed = false;
+        }
 
-    public StandardRecordReader(final InputStream in, final int serializationVersion, final String filename) {
-        if (serializationVersion < 1 || serializationVersion > 7) {
-            throw new IllegalArgumentException("Unable to deserialize record because the version is " + serializationVersion + " and supported versions are 1-6");
+        byteCountingIn = new ByteCountingInputStream(readableStream);
+        dis = new DataInputStream(byteCountingIn);
+        
+        final String repoClassName = dis.readUTF();
+        final int serializationVersion = dis.readInt();
+        headerLength = repoClassName.getBytes(StandardCharsets.UTF_8).length + 2 + 4;	// 2 bytes for string length, 4 for integer.
+        
+        if (serializationVersion < 1 || serializationVersion > 8) {
+            throw new IllegalArgumentException("Unable to deserialize record because the version is " + serializationVersion + " and supported versions are 1-8");
         }
 
-        byteCountingIn = new ByteCountingInputStream(in);
-        this.dis = new DataInputStream(byteCountingIn);
         this.serializationVersion = serializationVersion;
         this.filename = filename;
+        this.tocReader = tocReader;
+    }
+
+    @Override
+    public void skipToBlock(final int blockIndex) throws IOException {
+    	if ( tocReader == null ) {
+    		throw new IllegalStateException("Cannot skip to block " + blockIndex + " for Provenance Log " + filename + " because no Table-of-Contents file was found for this Log");
+    	}
+    	
+    	if ( blockIndex < 0 ) {
+    		throw new IllegalArgumentException("Cannot skip to block " + blockIndex + " because the value is negative");
+    	}
+    	
+    	if ( blockIndex == getBlockIndex() ) {
+    		return;
+    	}
+    	
+    	final long offset = tocReader.getBlockOffset(blockIndex);
+    	if ( offset < 0 ) {
+    		throw new IOException("Unable to find block " + blockIndex + " in Provenance Log " + filename);
+    	}
+    	
+    	final long curOffset = rawInputStream.getBytesConsumed();
+    	
+    	final long bytesToSkip = offset - curOffset;
+    	if ( bytesToSkip >= 0 ) {
+	    	try {
+	    		StreamUtils.skip(rawInputStream, bytesToSkip);
+	    		logger.debug("Skipped stream from offset {} to {} ({} bytes skipped)", curOffset, offset, bytesToSkip);
+	    	} catch (final IOException e) {
+	    		throw new IOException("Failed to skip to offset " + offset + " for block " + blockIndex + " of Provenance Log " + filename, e);
+	    	}
+	
+	    	resetStreamForNextBlock();
+    	}
     }
+    
+    private void resetStreamForNextBlock() throws IOException {
+    	final InputStream limitedStream;
+        if ( tocReader == null ) {
+        	limitedStream = rawInputStream;
+        } else {
+        	final long offset = tocReader.getBlockOffset(1 + getBlockIndex());
+        	if ( offset < 0 ) {
+        		limitedStream = rawInputStream;
+        	} else {
+        		limitedStream = new LimitingInputStream(rawInputStream, offset - rawInputStream.getBytesConsumed());
+        	}
+        }
+    	
+    	final InputStream readableStream;
+        if (compressed) {
+            readableStream = new BufferedInputStream(new GZIPInputStream(limitedStream));
+        } else {
+            readableStream = new BufferedInputStream(limitedStream);
+        }
 
+        byteCountingIn = new ByteCountingInputStream(readableStream, rawInputStream.getBytesConsumed());
+        dis = new DataInputStream(byteCountingIn);
+    }
+    
+    
+    @Override
+    public TocReader getTocReader() {
+    	return tocReader;
+    }
+    
+    @Override
+    public boolean isBlockIndexAvailable() {
+    	return tocReader != null;
+    }
+    
+    @Override
+    public int getBlockIndex() {
+    	if ( tocReader == null ) {
+    		throw new IllegalStateException("Cannot determine Block Index because no Table-of-Contents could be found for Provenance Log " + filename);
+    	}
+    	
+    	return tocReader.getBlockIndex(rawInputStream.getBytesConsumed());
+    }
+    
+    @Override
+    public long getBytesConsumed() {
+    	return byteCountingIn.getBytesConsumed();
+    }
+    
     private StandardProvenanceEventRecord readPreVersion6Record() throws IOException {
         final long startOffset = byteCountingIn.getBytesConsumed();
 
-        if (!isData(byteCountingIn)) {
+        if (!isData()) {
             return null;
         }
 
@@ -137,7 +269,7 @@ public class StandardRecordReader implements RecordReader {
 
         final long startOffset = byteCountingIn.getBytesConsumed();
 
-        if (!isData(byteCountingIn)) {
+        if (!isData()) {
             return null;
         }
 
@@ -242,9 +374,17 @@ public class StandardRecordReader implements RecordReader {
     }
 
     private String readUUID(final DataInputStream in) throws IOException {
-        final long msb = in.readLong();
-        final long lsb = in.readLong();
-        return new UUID(msb, lsb).toString();
+    	if ( serializationVersion < 8 ) {
+	        final long msb = in.readLong();
+	        final long lsb = in.readLong();
+	        return new UUID(msb, lsb).toString();
+    	} else {
+    		// before version 8, we serialized UUID's as two longs in order to
+    		// write less data. However, in version 8 we changed to just writing
+    		// out the string because it's extremely expensive to call UUID.fromString.
+    		// In the end, since we generally compress, the savings in minimal anyway.
+    		return in.readUTF();
+    	}
     }
 
     private String readNullableString(final DataInputStream in) throws IOException {
@@ -272,16 +412,58 @@ public class StandardRecordReader implements RecordReader {
         return new String(strBytes, "UTF-8");
     }
 
-    private boolean isData(final InputStream in) throws IOException {
-        in.mark(1);
-        final int nextByte = in.read();
-        in.reset();
+    private boolean isData() throws IOException {
+        byteCountingIn.mark(1);
+        int nextByte = byteCountingIn.read();
+        byteCountingIn.reset();
+        
+        if ( nextByte < 0 ) {
+        	try {
+        		resetStreamForNextBlock();
+        	} catch (final EOFException eof) {
+        		return false;
+        	}
+        	
+            byteCountingIn.mark(1);
+            nextByte = byteCountingIn.read();
+            byteCountingIn.reset();
+        }
+        
         return (nextByte >= 0);
     }
+    
+    @Override
+    public long getMaxEventId() throws IOException {
+    	if ( tocReader != null ) {
+    		final long lastBlockOffset = tocReader.getLastBlockOffset();
+    		skipToBlock(tocReader.getBlockIndex(lastBlockOffset));
+    	}
+    	
+    	ProvenanceEventRecord record;
+    	ProvenanceEventRecord lastRecord = null;
+    	try {
+	    	while ((record = nextRecord()) != null) {
+	    		lastRecord = record;
+	    	}
+    	} catch (final EOFException eof) {
+    		// This can happen if we stop NIFi while the record is being written.
+    		// This is OK, we just ignore this record. The session will not have been
+    		// committed, so we can just process the FlowFile again.
+    	}
+    	
+    	return (lastRecord == null) ? -1L : lastRecord.getEventId();
+    }
 
     @Override
     public void close() throws IOException {
+    	logger.trace("Closing Record Reader for {}", filename);
+    	
         dis.close();
+        rawInputStream.close();
+        
+        if ( tocReader != null ) {
+        	tocReader.close();
+        }
     }
 
     @Override
@@ -291,7 +473,10 @@ public class StandardRecordReader implements RecordReader {
 
     @Override
     public void skipTo(final long position) throws IOException {
-        final long currentPosition = byteCountingIn.getBytesConsumed();
+    	// we are subtracting headerLength from the number of bytes consumed because we used to 
+    	// consider the offset of the first record "0" - now we consider it whatever position it
+    	// it really is in the stream.
+        final long currentPosition = byteCountingIn.getBytesConsumed() - headerLength;
         if (currentPosition == position) {
             return;
         }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a1027aea/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
index df93084..dbb2c48 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
@@ -19,38 +19,54 @@ package org.apache.nifi.provenance;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.Collection;
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.nifi.provenance.serialization.RecordWriter;
+import org.apache.nifi.provenance.toc.TocWriter;
 import org.apache.nifi.stream.io.BufferedOutputStream;
 import org.apache.nifi.stream.io.ByteCountingOutputStream;
 import org.apache.nifi.stream.io.DataOutputStream;
-import org.apache.nifi.provenance.serialization.RecordWriter;
+import org.apache.nifi.stream.io.GZIPOutputStream;
+import org.apache.nifi.stream.io.NonCloseableOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class StandardRecordWriter implements RecordWriter {
-
+	private static final Logger logger = LoggerFactory.getLogger(StandardRecordWriter.class);
+	
     private final File file;
-    private final DataOutputStream out;
-    private final ByteCountingOutputStream byteCountingOut;
     private final FileOutputStream fos;
+    private final ByteCountingOutputStream rawOutStream;
+    private final TocWriter tocWriter;
+    private final boolean compressed;
+    private final int uncompressedBlockSize;
+    
+    private DataOutputStream out;
+    private ByteCountingOutputStream byteCountingOut;
+    private long lastBlockOffset = 0L;
     private int recordCount = 0;
 
     private final Lock lock = new ReentrantLock();
 
-    public StandardRecordWriter(final File file) throws IOException {
+    
+    public StandardRecordWriter(final File file, final TocWriter writer, final boolean compressed, final int uncompressedBlockSize) throws IOException {
+    	logger.trace("Creating Record Writer for {}", file.getName());
+    	
         this.file = file;
+        this.compressed = compressed;
         this.fos = new FileOutputStream(file);
-        this.byteCountingOut = new ByteCountingOutputStream(new BufferedOutputStream(fos, 65536));
-        this.out = new DataOutputStream(byteCountingOut);
+        rawOutStream = new ByteCountingOutputStream(fos);
+        this.uncompressedBlockSize = uncompressedBlockSize;
+        
+        this.tocWriter = writer;
     }
 
     static void writeUUID(final DataOutputStream out, final String uuid) throws IOException {
-        final UUID uuidObj = UUID.fromString(uuid);
-        out.writeLong(uuidObj.getMostSignificantBits());
-        out.writeLong(uuidObj.getLeastSignificantBits());
+    	out.writeUTF(uuid);
     }
 
     static void writeUUIDs(final DataOutputStream out, final Collection<String> list) throws IOException {
@@ -69,18 +85,67 @@ public class StandardRecordWriter implements RecordWriter {
         return file;
     }
 
-    @Override
+	@Override
     public synchronized void writeHeader() throws IOException {
+        lastBlockOffset = rawOutStream.getBytesWritten();
+        resetWriteStream();
+        
         out.writeUTF(PersistentProvenanceRepository.class.getName());
         out.writeInt(PersistentProvenanceRepository.SERIALIZATION_VERSION);
         out.flush();
     }
+    
+    private void resetWriteStream() throws IOException {
+    	if ( out != null ) {
+    		out.flush();
+    	}
+
+    	final long byteOffset = (byteCountingOut == null) ? rawOutStream.getBytesWritten() : byteCountingOut.getBytesWritten();
+    	
+    	final OutputStream writableStream;
+    	if ( compressed ) {
+    		// because of the way that GZIPOutputStream works, we need to call close() on it in order for it
+    		// to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap
+    		// the underlying OutputStream in a NonCloseableOutputStream
+    		if ( out != null ) {
+    			out.close();
+    		}
+
+        	if ( tocWriter != null ) {
+        		tocWriter.addBlockOffset(rawOutStream.getBytesWritten());
+        	}
+
+    		writableStream = new BufferedOutputStream(new GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536);
+    	} else {
+        	if ( tocWriter != null ) {
+        		tocWriter.addBlockOffset(rawOutStream.getBytesWritten());
+        	}
+
+    		writableStream = new BufferedOutputStream(rawOutStream, 65536);
+    	}
+    	
+        this.byteCountingOut = new ByteCountingOutputStream(writableStream, byteOffset);
+        this.out = new DataOutputStream(byteCountingOut);
+    }
+    
 
     @Override
     public synchronized long writeRecord(final ProvenanceEventRecord record, long recordIdentifier) throws IOException {
         final ProvenanceEventType recordType = record.getEventType();
         final long startBytes = byteCountingOut.getBytesWritten();
 
+        // add a new block to the TOC if needed.
+        if ( tocWriter != null && (startBytes - lastBlockOffset >= uncompressedBlockSize) ) {
+        	lastBlockOffset = startBytes;
+        	
+        	if ( compressed ) {
+        		// because of the way that GZIPOutputStream works, we need to call close() on it in order for it
+        		// to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap
+        		// the underlying OutputStream in a NonCloseableOutputStream
+        		resetWriteStream();
+        	}
+        }
+        
         out.writeLong(recordIdentifier);
         out.writeUTF(record.getEventType().name());
         out.writeLong(record.getEventTime());
@@ -196,13 +261,24 @@ public class StandardRecordWriter implements RecordWriter {
 
     @Override
     public synchronized void close() throws IOException {
+    	logger.trace("Closing Record Writer for {}", file.getName());
+    	
         lock();
         try {
-            out.flush();
-            out.close();
+        	try {
+        		out.flush();
+        		out.close();
+        	} finally {
+        		rawOutStream.close();
+            
+	            if ( tocWriter != null ) {
+	            	tocWriter.close();
+	            }
+        	}
         } finally {
             unlock();
         }
+        
     }
 
     @Override
@@ -232,6 +308,14 @@ public class StandardRecordWriter implements RecordWriter {
 
     @Override
     public void sync() throws IOException {
-        fos.getFD().sync();
+    	if ( tocWriter != null ) {
+    		tocWriter.sync();
+    	}
+    	fos.getFD().sync();
+    }
+    
+    @Override
+    public TocWriter getTocWriter() {
+    	return tocWriter;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a1027aea/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
index 4608419..e3bf4d6 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
@@ -16,25 +16,17 @@
  */
 package org.apache.nifi.provenance.lucene;
 
-import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.Term;
 import org.apache.nifi.provenance.IndexConfiguration;
 import org.apache.nifi.provenance.PersistentProvenanceRepository;
-import org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.apache.nifi.provenance.expiration.ExpirationAction;
 import org.apache.nifi.provenance.serialization.RecordReader;
 import org.apache.nifi.provenance.serialization.RecordReaders;
-
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.FSDirectory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,10 +35,12 @@ public class DeleteIndexAction implements ExpirationAction {
     private static final Logger logger = LoggerFactory.getLogger(DeleteIndexAction.class);
     private final PersistentProvenanceRepository repository;
     private final IndexConfiguration indexConfiguration;
+    private final IndexManager indexManager;
 
-    public DeleteIndexAction(final PersistentProvenanceRepository repo, final IndexConfiguration indexConfiguration) {
+    public DeleteIndexAction(final PersistentProvenanceRepository repo, final IndexConfiguration indexConfiguration, final IndexManager indexManager) {
         this.repository = repo;
         this.indexConfiguration = indexConfiguration;
+        this.indexManager = indexManager;
     }
 
     @Override
@@ -55,51 +49,36 @@ public class DeleteIndexAction implements ExpirationAction {
         long numDeleted = 0;
         long maxEventId = -1L;
         try (final RecordReader reader = RecordReaders.newRecordReader(expiredFile, repository.getAllLogFiles())) {
-            try {
-                StandardProvenanceEventRecord record;
-                while ((record = reader.nextRecord()) != null) {
-                    numDeleted++;
-
-                    if (record.getEventId() > maxEventId) {
-                        maxEventId = record.getEventId();
-                    }
-                }
-            } catch (final EOFException eof) {
-                // finished reading -- the last record was not completely written out, so it is discarded.
-            }
-        } catch (final EOFException eof) {
-            // no data in file.
-            return expiredFile;
+        	maxEventId = reader.getMaxEventId();
         }
 
         // remove the records from the index
         final List<File> indexDirs = indexConfiguration.getIndexDirectories(expiredFile);
         for (final File indexingDirectory : indexDirs) {
-            try (final Directory directory = FSDirectory.open(indexingDirectory);
-                    final Analyzer analyzer = new StandardAnalyzer()) {
-                IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
-                config.setWriteLockTimeout(300000L);
-
-                Term term = new Term(FieldNames.STORAGE_FILENAME, LuceneUtil.substringBefore(expiredFile.getName(), "."));
+            Term term = new Term(FieldNames.STORAGE_FILENAME, LuceneUtil.substringBefore(expiredFile.getName(), "."));
 
-                boolean deleteDir = false;
-                try (final IndexWriter indexWriter = new IndexWriter(directory, config)) {
-                    indexWriter.deleteDocuments(term);
-                    indexWriter.commit();
-                    final int docsLeft = indexWriter.numDocs();
-                    deleteDir = (docsLeft <= 0);
-                    logger.debug("After expiring {}, there are {} docs left for index {}", expiredFile, docsLeft, indexingDirectory);
-                }
+            boolean deleteDir = false;
+            final IndexWriter writer = indexManager.borrowIndexWriter(indexingDirectory);
+            try {
+                writer.deleteDocuments(term);
+                writer.commit();
+                final int docsLeft = writer.numDocs();
+                deleteDir = (docsLeft <= 0);
+                logger.debug("After expiring {}, there are {} docs left for index {}", expiredFile, docsLeft, indexingDirectory);
+            } finally {
+            	indexManager.returnIndexWriter(indexingDirectory, writer);
+            }
 
-                // we've confirmed that all documents have been removed. Delete the index directory.
-                if (deleteDir) {
-                    indexConfiguration.removeIndexDirectory(indexingDirectory);
-                    deleteDirectory(indexingDirectory);
-                    logger.info("Removed empty index directory {}", indexingDirectory);
-                }
+            // we've confirmed that all documents have been removed. Delete the index directory.
+            if (deleteDir) {
+                indexConfiguration.removeIndexDirectory(indexingDirectory);
+                indexManager.removeIndex(indexingDirectory);
+                
+                deleteDirectory(indexingDirectory);
+                logger.info("Removed empty index directory {}", indexingDirectory);
             }
         }
-
+        
         // Update the minimum index to 1 more than the max Event ID in this file.
         if (maxEventId > -1L) {
             indexConfiguration.setMinIdIndexed(maxEventId + 1L);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a1027aea/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
index af5fe50..93978cd 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
@@ -26,20 +26,26 @@ import java.util.Collections;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.SearchableFields;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.apache.nifi.provenance.serialization.RecordReader;
 import org.apache.nifi.provenance.serialization.RecordReaders;
-
+import org.apache.nifi.provenance.toc.TocReader;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexableField;
 import org.apache.lucene.search.ScoreDoc;
 import org.apache.lucene.search.TopDocs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class DocsReader {
-
+	private final Logger logger = LoggerFactory.getLogger(DocsReader.class);
+	
     public DocsReader(final List<File> storageDirectories) {
     }
 
@@ -48,50 +54,86 @@ public class DocsReader {
             return Collections.emptySet();
         }
 
-        final List<Document> docs = new ArrayList<>();
+        final long start = System.nanoTime();
+        final int numDocs = Math.min(topDocs.scoreDocs.length, maxResults);
+        final List<Document> docs = new ArrayList<>(numDocs);
 
-        for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
+        for (final ScoreDoc scoreDoc : topDocs.scoreDocs) {
             final int docId = scoreDoc.doc;
             final Document d = indexReader.document(docId);
             docs.add(d);
+            if ( retrievalCount.incrementAndGet() >= maxResults ) {
+                break;
+            }
         }
 
-        return read(docs, allProvenanceLogFiles, retrievalCount, maxResults);
+        final long readDocuments = System.nanoTime() - start;
+        logger.debug("Reading {} Lucene Documents took {} millis", docs.size(), TimeUnit.NANOSECONDS.toMillis(readDocuments));
+        return read(docs, allProvenanceLogFiles);
+    }
+
+    
+    private long getByteOffset(final Document d, final RecordReader reader) {
+        final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
+        if ( blockField != null ) {
+        	final int blockIndex = blockField.numericValue().intValue();
+        	final TocReader tocReader = reader.getTocReader();
+        	return tocReader.getBlockOffset(blockIndex);
+        }
+        
+    	return d.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
+    }
+    
+    
+    private ProvenanceEventRecord getRecord(final Document d, final RecordReader reader) throws IOException {
+    	IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
+    	if ( blockField == null ) {
+    		reader.skipTo(getByteOffset(d, reader));
+    	} else {
+    		reader.skipToBlock(blockField.numericValue().intValue());
+    	}
+    	
+        StandardProvenanceEventRecord record;
+        while ( (record = reader.nextRecord()) != null) {
+        	IndexableField idField = d.getField(SearchableFields.Identifier.getSearchableFieldName());
+        	if ( idField == null || idField.numericValue().longValue() == record.getEventId() ) {
+        		break;
+        	}
+        }
+        
+        if ( record == null ) {
+        	throw new IOException("Failed to find Provenance Event " + d);
+        } else {
+        	return record;
+        }
     }
+    
 
-    public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles, final AtomicInteger retrievalCount, final int maxResults) throws IOException {
+    public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles) throws IOException {
         LuceneUtil.sortDocsForRetrieval(docs);
 
         RecordReader reader = null;
         String lastStorageFilename = null;
-        long lastByteOffset = 0L;
         final Set<ProvenanceEventRecord> matchingRecords = new LinkedHashSet<>();
 
+        final long start = System.nanoTime();
+        int logFileCount = 0;
         try {
             for (final Document d : docs) {
                 final String storageFilename = d.getField(FieldNames.STORAGE_FILENAME).stringValue();
-                final long byteOffset = d.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
-
+                
                 try {
-                    if (reader != null && storageFilename.equals(lastStorageFilename) && byteOffset > lastByteOffset) {
-                        // Still the same file and the offset is downstream.
-                        try {
-                            reader.skipTo(byteOffset);
-                            final StandardProvenanceEventRecord record = reader.nextRecord();
-                            matchingRecords.add(record);
-                            if (retrievalCount.incrementAndGet() >= maxResults) {
-                                break;
-                            }
-                        } catch (final IOException e) {
-                            throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository");
-                        }
-
+                    if (reader != null && storageFilename.equals(lastStorageFilename)) {
+                       	matchingRecords.add(getRecord(d, reader));
                     } else {
+                    	logger.debug("Opening log file {}", storageFilename);
+                    	
+                    	logFileCount++;
                         if (reader != null) {
                             reader.close();
                         }
 
-                        final List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
+                        List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
                         if (potentialFiles.isEmpty()) {
                             throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository");
                         }
@@ -104,13 +146,7 @@ public class DocsReader {
                             reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles);
 
                             try {
-                                reader.skip(byteOffset);
-
-                                final StandardProvenanceEventRecord record = reader.nextRecord();
-                                matchingRecords.add(record);
-                                if (retrievalCount.incrementAndGet() >= maxResults) {
-                                    break;
-                                }
+                               	matchingRecords.add(getRecord(d, reader));
                             } catch (final IOException e) {
                                 throw new IOException("Failed to retrieve record from Provenance File " + file + " due to " + e, e);
                             }
@@ -118,7 +154,6 @@ public class DocsReader {
                     }
                 } finally {
                     lastStorageFilename = storageFilename;
-                    lastByteOffset = byteOffset;
                 }
             }
         } finally {
@@ -127,6 +162,9 @@ public class DocsReader {
             }
         }
 
+        final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        logger.debug("Took {} ms to read {} events from {} prov log files", millis, matchingRecords.size(), logFileCount);
+
         return matchingRecords;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a1027aea/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/FieldNames.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/FieldNames.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/FieldNames.java
index 6afc193..90a73f4 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/FieldNames.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/FieldNames.java
@@ -20,4 +20,5 @@ public class FieldNames {
 
     public static final String STORAGE_FILENAME = "storage-filename";
     public static final String STORAGE_FILE_OFFSET = "storage-fileOffset";
+    public static final String BLOCK_INDEX = "block-index";
 }


[2/7] incubator-nifi git commit: NIFI-527: Refactored the serialization format of the persistent prov repo to use compression blocks and index them

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a1027aea/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
new file mode 100644
index 0000000..19c5b75
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.lucene;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.nifi.provenance.IndexConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IndexManager implements Closeable {
+	private static final Logger logger = LoggerFactory.getLogger(IndexManager.class);
+	private final IndexConfiguration indexConfig;
+	
+	private final Lock lock = new ReentrantLock();
+	private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
+	private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>();
+	
+	public IndexManager(final IndexConfiguration indexConfig) {
+		this.indexConfig = indexConfig;
+	}
+	
+	public void removeIndex(final File indexDirectory) {
+		final File absoluteFile = indexDirectory.getAbsoluteFile();
+
+		lock.lock();
+		try {
+			final IndexWriterCount count = writerCounts.remove(absoluteFile);
+			if ( count != null ) {
+				try {
+					count.close();
+				} catch (final IOException ioe) {
+					logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile);
+					if ( logger.isDebugEnabled() ) {
+						logger.warn("", ioe);
+					}
+				}
+			}
+			
+			for ( final List<ActiveIndexSearcher> searcherList : activeSearchers.values() ) {
+				for ( final ActiveIndexSearcher searcher : searcherList ) {
+					try {
+						searcher.close();
+					} catch (final IOException ioe) {
+						logger.warn("Failed to close Index Searcher {} for {} due to {}", 
+								searcher.getSearcher(), absoluteFile, ioe);
+						if ( logger.isDebugEnabled() ) {
+							logger.warn("", ioe);
+						}
+					}
+				}
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+	
+	public IndexWriter borrowIndexWriter(final File journalFile) throws IOException {
+		final File indexingDirectory = indexConfig.getWritableIndexDirectory(journalFile);
+		final File absoluteFile = indexingDirectory.getAbsoluteFile();
+
+		lock.lock();
+		try {
+			IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+			if ( writerCount == null ) {
+				
+				final List<Closeable> closeables = new ArrayList<>();
+                final Directory directory = FSDirectory.open(indexingDirectory);
+                closeables.add(directory);
+                
+                try {
+                	final Analyzer analyzer = new StandardAnalyzer();
+                	closeables.add(analyzer);
+                	
+                    final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
+                    config.setWriteLockTimeout(300000L);
+
+                    final IndexWriter indexWriter = new IndexWriter(directory, config);
+                    writerCount = new IndexWriterCount(indexWriter, analyzer, directory, 1);
+                } catch (final IOException ioe) {
+                	for ( final Closeable closeable : closeables ) {
+                		try {
+                			closeable.close();
+                		} catch (final IOException ioe2) {
+                			ioe.addSuppressed(ioe2);
+                		}
+                	}
+                	
+                	throw ioe;
+                }
+                
+                writerCounts.put(absoluteFile, writerCount);
+			} else {
+				writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+						writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
+			}
+			
+			return writerCount.getWriter();
+		} finally {
+			lock.unlock();
+		}
+	}
+	
+	public void returnIndexWriter(final File journalFile, final IndexWriter writer) {
+		final File indexingDirectory = indexConfig.getWritableIndexDirectory(journalFile);
+		final File absoluteFile = indexingDirectory.getAbsoluteFile();
+		
+		lock.lock();
+		try {
+			IndexWriterCount count = writerCounts.remove(absoluteFile);
+			
+			try {
+				if ( count == null ) {
+					logger.warn("Index Writer {} was returned to IndexManager for Jornal File {}, but this writer is not known. This could potentially lead to a resource leak", writer, journalFile);
+					writer.close();
+				} else if ( count.getCount() <= 1 ) {
+					// we are finished with this writer.
+					count.close();
+				} else {
+					// decrement the count.
+					writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1));
+				}
+			} catch (final IOException ioe) {
+				logger.warn("Failed to close Index Writer {} due to {}", writer, ioe);
+				if ( logger.isDebugEnabled() ) {
+					logger.warn("", ioe);
+				}
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	
+	public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
+		final File absoluteFile = indexDir.getAbsoluteFile();
+		
+		lock.lock();
+		try {
+			// check if we already have a reader cached.
+			List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
+			if ( currentlyCached == null ) {
+				currentlyCached = new ArrayList<>();
+				activeSearchers.put(absoluteFile, currentlyCached);
+			} else {
+				for ( final ActiveIndexSearcher searcher : currentlyCached ) {
+					if ( searcher.isCache() ) {
+						return searcher.getSearcher();
+					}
+				}
+			}
+			
+			IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+			if ( writerCount == null ) {
+				final Directory directory = FSDirectory.open(absoluteFile);
+				
+				try {
+					final DirectoryReader directoryReader = DirectoryReader.open(directory);
+					final IndexSearcher searcher = new IndexSearcher(directoryReader);
+					
+					// we want to cache the searcher that we create, since it's just a reader.
+					final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, directoryReader, directory, true);
+					currentlyCached.add(cached);
+					
+					return cached.getSearcher();
+				} catch (final IOException e) {
+					try {
+						directory.close();
+					} catch (final IOException ioe) {
+						e.addSuppressed(ioe);
+					}
+					
+					throw e;
+				}
+			} else {
+				// increment the writer count to ensure that it's kept open.
+				writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+						writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
+				
+				// create a new Index Searcher from the writer so that we don't have an issue with trying
+				// to read from a directory that's locked. If we get the "no segments* file found" with
+				// Lucene, this indicates that an IndexWriter already has the directory open.
+				final IndexWriter writer = writerCount.getWriter();
+				final DirectoryReader directoryReader = DirectoryReader.open(writer, false);
+				final IndexSearcher searcher = new IndexSearcher(directoryReader);
+				
+				// we don't want to cache this searcher because it's based on a writer, so we want to get
+				// new values the next time that we search.
+				final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, directoryReader, null, false);
+				
+				currentlyCached.add(activeSearcher);
+				return activeSearcher.getSearcher();
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+	
+	
+	public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) {
+		final File absoluteFile = indexDirectory.getAbsoluteFile();
+		
+		lock.lock();
+		try {
+			// check if we already have a reader cached.
+			List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
+			if ( currentlyCached == null ) {
+				logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could "
+						+ "result in a resource leak", indexDirectory);
+				return;
+			}
+			
+			final Iterator<ActiveIndexSearcher> itr = currentlyCached.iterator();
+			while (itr.hasNext()) {
+				final ActiveIndexSearcher activeSearcher = itr.next();
+				if ( activeSearcher.getSearcher().equals(searcher) ) {
+					if ( activeSearcher.isCache() ) {
+						// the searcher is cached. Just leave it open.
+						return;
+					} else {
+						// searcher is not cached. It was created from a writer, and we want
+						// the newest updates the next time that we get a searcher, so we will
+						// go ahead and close this one out.
+						itr.remove();
+						
+						// decrement the writer count because we incremented it when creating the searcher
+						final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+						if ( writerCount != null ) {
+							if ( writerCount.getCount() <= 1 ) {
+								try {
+									writerCount.close();
+								} catch (final IOException ioe) {
+									logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe);
+									if ( logger.isDebugEnabled() ) {
+										logger.warn("", ioe);
+									}
+								}
+							} else {
+								writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+									writerCount.getAnalyzer(), writerCount.getDirectory(), 
+									writerCount.getCount() - 1));
+							}
+						}
+
+						try {
+							activeSearcher.close();
+						} catch (final IOException ioe) {
+							logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
+							if ( logger.isDebugEnabled() ) {
+								logger.warn("", ioe);
+							}
+						}
+					}
+				}
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+	
+	@Override
+	public void close() throws IOException {
+		lock.lock();
+		try {
+			IOException ioe = null;
+			
+			for ( final IndexWriterCount count : writerCounts.values() ) {
+				try {
+					count.close();
+				} catch (final IOException e) {
+					if ( ioe == null ) {
+						ioe = e;
+					} else {
+						ioe.addSuppressed(e);
+					}
+				}
+			}
+			
+			for (final List<ActiveIndexSearcher> searcherList : activeSearchers.values()) {
+				for (final ActiveIndexSearcher searcher : searcherList) {
+					try {
+						searcher.close();
+					} catch (final IOException e) {
+						if ( ioe == null ) {
+							ioe = e;
+						} else {
+							ioe.addSuppressed(e);
+						}
+					}
+				}
+			}
+			
+			if ( ioe != null ) {
+				throw ioe;
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	
+	private static void close(final Closeable... closeables) throws IOException {
+		IOException ioe = null;
+		for ( final Closeable closeable : closeables ) {
+			if ( closeable == null ) {
+				continue;
+			}
+			
+			try {
+				closeable.close();
+			} catch (final IOException e) {
+				if ( ioe == null ) {
+					ioe = e;
+				} else {
+					ioe.addSuppressed(e);
+				}
+			}
+		}
+		
+		if ( ioe != null ) {
+			throw ioe;
+		}
+	}
+	
+	
+	private static class ActiveIndexSearcher implements Closeable {
+		private final IndexSearcher searcher;
+		private final DirectoryReader directoryReader;
+		private final Directory directory;
+		private final boolean cache;
+		
+		public ActiveIndexSearcher(IndexSearcher searcher, DirectoryReader directoryReader, 
+				Directory directory, final boolean cache) {
+			this.searcher = searcher;
+			this.directoryReader = directoryReader;
+			this.directory = directory;
+			this.cache = cache;
+		}
+
+		public boolean isCache() {
+			return cache;
+		}
+
+		public IndexSearcher getSearcher() {
+			return searcher;
+		}
+		
+		@Override
+		public void close() throws IOException {
+			IndexManager.close(directoryReader, directory);
+		}
+	}
+	
+	
+	private static class IndexWriterCount implements Closeable {
+		private final IndexWriter writer;
+		private final Analyzer analyzer;
+		private final Directory directory;
+		private final int count;
+		
+		public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) {
+			this.writer = writer;
+			this.analyzer = analyzer;
+			this.directory = directory;
+			this.count = count;
+		}
+
+		public Analyzer getAnalyzer() {
+			return analyzer;
+		}
+
+		public Directory getDirectory() {
+			return directory;
+		}
+
+		public IndexWriter getWriter() {
+			return writer;
+		}
+
+		public int getCount() {
+			return count;
+		}
+
+		@Override
+		public void close() throws IOException {
+			IndexManager.close(writer, analyzer, directory);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a1027aea/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
index e2854c3..f6723cf 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
@@ -21,27 +21,29 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Date;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.nifi.provenance.PersistentProvenanceRepository;
-import org.apache.nifi.provenance.ProvenanceEventRecord;
-import org.apache.nifi.provenance.StandardQueryResult;
-
-import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexNotFoundException;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.TopDocs;
-import org.apache.lucene.store.FSDirectory;
+import org.apache.nifi.provenance.PersistentProvenanceRepository;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.StandardQueryResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class IndexSearch {
-
+	private final Logger logger = LoggerFactory.getLogger(IndexSearch.class);
     private final PersistentProvenanceRepository repository;
     private final File indexDirectory;
+    private final IndexManager indexManager;
 
-    public IndexSearch(final PersistentProvenanceRepository repo, final File indexDirectory) {
+    public IndexSearch(final PersistentProvenanceRepository repo, final File indexDirectory, final IndexManager indexManager) {
         this.repository = repo;
         this.indexDirectory = indexDirectory;
+        this.indexManager = indexManager;
     }
 
     public StandardQueryResult search(final org.apache.nifi.provenance.search.Query provenanceQuery, final AtomicInteger retrievedCount) throws IOException {
@@ -55,29 +57,43 @@ public class IndexSearch {
         final StandardQueryResult sqr = new StandardQueryResult(provenanceQuery, 1);
         final Set<ProvenanceEventRecord> matchingRecords;
 
-        try (final DirectoryReader directoryReader = DirectoryReader.open(FSDirectory.open(indexDirectory))) {
-            final IndexSearcher searcher = new IndexSearcher(directoryReader);
-
-            if (provenanceQuery.getEndDate() == null) {
-                provenanceQuery.setEndDate(new Date());
-            }
-            final Query luceneQuery = LuceneUtil.convertQuery(provenanceQuery);
+        if (provenanceQuery.getEndDate() == null) {
+            provenanceQuery.setEndDate(new Date());
+        }
+        final Query luceneQuery = LuceneUtil.convertQuery(provenanceQuery);
 
-            TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults());
+        final long start = System.nanoTime();
+        final IndexSearcher searcher = indexManager.borrowIndexSearcher(indexDirectory);
+        try {
+            final long searchStartNanos = System.nanoTime();
+            final long openSearcherNanos = searchStartNanos - start;
+            
+            final TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults());
+            final long finishSearch = System.nanoTime();
+            final long searchNanos = finishSearch - searchStartNanos;
+            
+            logger.debug("Searching {} took {} millis; opening searcher took {} millis", this, 
+            		TimeUnit.NANOSECONDS.toMillis(searchNanos), TimeUnit.NANOSECONDS.toMillis(openSearcherNanos));
+            
             if (topDocs.totalHits == 0) {
                 sqr.update(Collections.<ProvenanceEventRecord>emptyList(), 0);
                 return sqr;
             }
 
             final DocsReader docsReader = new DocsReader(repository.getConfiguration().getStorageDirectories());
-            matchingRecords = docsReader.read(topDocs, directoryReader, repository.getAllLogFiles(), retrievedCount, provenanceQuery.getMaxResults());
-
+            matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount, provenanceQuery.getMaxResults());
+            
+            final long readRecordsNanos = System.nanoTime() - finishSearch;
+            logger.debug("Reading {} records took {} millis for {}", matchingRecords.size(), TimeUnit.NANOSECONDS.toMillis(readRecordsNanos), this);
+            
             sqr.update(matchingRecords, topDocs.totalHits);
             return sqr;
         } catch (final IndexNotFoundException e) {
             // nothing has been indexed yet.
             sqr.update(Collections.<ProvenanceEventRecord>emptyList(), 0);
             return sqr;
+        } finally {
+        	indexManager.returnIndexSearcher(indexDirectory, searcher);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a1027aea/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
index 214267a..5e87913 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
@@ -24,27 +24,27 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.provenance.IndexConfiguration;
-import org.apache.nifi.provenance.PersistentProvenanceRepository;
-import org.apache.nifi.provenance.ProvenanceEventType;
-import org.apache.nifi.provenance.SearchableFields;
-import org.apache.nifi.provenance.StandardProvenanceEventRecord;
-import org.apache.nifi.provenance.rollover.RolloverAction;
-import org.apache.nifi.provenance.search.SearchableField;
-import org.apache.nifi.provenance.serialization.RecordReader;
-import org.apache.nifi.provenance.serialization.RecordReaders;
-
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.IntField;
 import org.apache.lucene.document.LongField;
 import org.apache.lucene.document.StringField;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.provenance.IndexConfiguration;
+import org.apache.nifi.provenance.PersistentProvenanceRepository;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.SearchableFields;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.provenance.rollover.RolloverAction;
+import org.apache.nifi.provenance.search.SearchableField;
+import org.apache.nifi.provenance.serialization.RecordReader;
+import org.apache.nifi.provenance.serialization.RecordReaders;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,15 +72,93 @@ public class IndexingAction implements RolloverAction {
         doc.add(new StringField(field.getSearchableFieldName(), value.toLowerCase(), store));
     }
 
+    
+    public void index(final StandardProvenanceEventRecord record, final IndexWriter indexWriter, final Integer blockIndex) throws IOException {
+        final Map<String, String> attributes = record.getAttributes();
+
+        final Document doc = new Document();
+        addField(doc, SearchableFields.FlowFileUUID, record.getFlowFileUuid(), Store.NO);
+        addField(doc, SearchableFields.Filename, attributes.get(CoreAttributes.FILENAME.key()), Store.NO);
+        addField(doc, SearchableFields.ComponentID, record.getComponentId(), Store.NO);
+        addField(doc, SearchableFields.AlternateIdentifierURI, record.getAlternateIdentifierUri(), Store.NO);
+        addField(doc, SearchableFields.EventType, record.getEventType().name(), Store.NO);
+        addField(doc, SearchableFields.Relationship, record.getRelationship(), Store.NO);
+        addField(doc, SearchableFields.Details, record.getDetails(), Store.NO);
+        addField(doc, SearchableFields.ContentClaimSection, record.getContentClaimSection(), Store.NO);
+        addField(doc, SearchableFields.ContentClaimContainer, record.getContentClaimContainer(), Store.NO);
+        addField(doc, SearchableFields.ContentClaimIdentifier, record.getContentClaimIdentifier(), Store.NO);
+        addField(doc, SearchableFields.SourceQueueIdentifier, record.getSourceQueueIdentifier(), Store.NO);
+
+        if (nonAttributeSearchableFields.contains(SearchableFields.TransitURI)) {
+            addField(doc, SearchableFields.TransitURI, record.getTransitUri(), Store.NO);
+        }
+
+        for (final SearchableField searchableField : attributeSearchableFields) {
+            addField(doc, searchableField, attributes.get(searchableField.getSearchableFieldName()), Store.NO);
+        }
+
+        final String storageFilename = LuceneUtil.substringBefore(record.getStorageFilename(), ".");
+
+        // Index the fields that we always index (unless there's nothing else to index at all)
+        if (!doc.getFields().isEmpty()) {
+            doc.add(new LongField(SearchableFields.LineageStartDate.getSearchableFieldName(), record.getLineageStartDate(), Store.NO));
+            doc.add(new LongField(SearchableFields.EventTime.getSearchableFieldName(), record.getEventTime(), Store.NO));
+            doc.add(new LongField(SearchableFields.FileSize.getSearchableFieldName(), record.getFileSize(), Store.NO));
+            doc.add(new StringField(FieldNames.STORAGE_FILENAME, storageFilename, Store.YES));
+            
+            if ( blockIndex == null ) {
+            	doc.add(new LongField(FieldNames.STORAGE_FILE_OFFSET, record.getStorageByteOffset(), Store.YES));
+            } else {
+	            doc.add(new IntField(FieldNames.BLOCK_INDEX, blockIndex, Store.YES));
+	            doc.add(new LongField(SearchableFields.Identifier.getSearchableFieldName(), record.getEventId(), Store.YES));
+            }
+            
+            for (final String lineageIdentifier : record.getLineageIdentifiers()) {
+                addField(doc, SearchableFields.LineageIdentifier, lineageIdentifier, Store.NO);
+            }
+
+            // If it's event is a FORK, or JOIN, add the FlowFileUUID for all child/parent UUIDs.
+            if (record.getEventType() == ProvenanceEventType.FORK || record.getEventType() == ProvenanceEventType.CLONE || record.getEventType() == ProvenanceEventType.REPLAY) {
+                for (final String uuid : record.getChildUuids()) {
+                    if (!uuid.equals(record.getFlowFileUuid())) {
+                        addField(doc, SearchableFields.FlowFileUUID, uuid, Store.NO);
+                    }
+                }
+            } else if (record.getEventType() == ProvenanceEventType.JOIN) {
+                for (final String uuid : record.getParentUuids()) {
+                    if (!uuid.equals(record.getFlowFileUuid())) {
+                        addField(doc, SearchableFields.FlowFileUUID, uuid, Store.NO);
+                    }
+                }
+            } else if (record.getEventType() == ProvenanceEventType.RECEIVE && record.getSourceSystemFlowFileIdentifier() != null) {
+                // If we get a receive with a Source System FlowFile Identifier, we add another Document that shows the UUID
+                // that the Source System uses to refer to the data.
+                final String sourceIdentifier = record.getSourceSystemFlowFileIdentifier();
+                final String sourceFlowFileUUID;
+                final int lastColon = sourceIdentifier.lastIndexOf(":");
+                if (lastColon > -1 && lastColon < sourceIdentifier.length() - 2) {
+                    sourceFlowFileUUID = sourceIdentifier.substring(lastColon + 1);
+                } else {
+                    sourceFlowFileUUID = null;
+                }
+
+                if (sourceFlowFileUUID != null) {
+                    addField(doc, SearchableFields.FlowFileUUID, sourceFlowFileUUID, Store.NO);
+                }
+            }
+
+            indexWriter.addDocument(doc);
+        }
+    }
+    
     @Override
-    @SuppressWarnings("deprecation")
     public File execute(final File fileRolledOver) throws IOException {
         final File indexingDirectory = indexConfiguration.getWritableIndexDirectory(fileRolledOver);
         int indexCount = 0;
         long maxId = -1L;
 
         try (final Directory directory = FSDirectory.open(indexingDirectory);
-                final Analyzer analyzer = new StandardAnalyzer(LuceneUtil.LUCENE_VERSION)) {
+                final Analyzer analyzer = new StandardAnalyzer()) {
 
             final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
             config.setWriteLockTimeout(300000L);
@@ -89,6 +167,13 @@ public class IndexingAction implements RolloverAction {
                     final RecordReader reader = RecordReaders.newRecordReader(fileRolledOver, repository.getAllLogFiles())) {
                 StandardProvenanceEventRecord record;
                 while (true) {
+                	final Integer blockIndex;
+                	if ( reader.isBlockIndexAvailable() ) {
+                		blockIndex = reader.getBlockIndex();
+                	} else {
+                		blockIndex = null;
+                	}
+                	
                     try {
                         record = reader.nextRecord();
                     } catch (final EOFException eof) {
@@ -104,76 +189,8 @@ public class IndexingAction implements RolloverAction {
 
                     maxId = record.getEventId();
 
-                    final Map<String, String> attributes = record.getAttributes();
-
-                    final Document doc = new Document();
-                    addField(doc, SearchableFields.FlowFileUUID, record.getFlowFileUuid(), Store.NO);
-                    addField(doc, SearchableFields.Filename, attributes.get(CoreAttributes.FILENAME.key()), Store.NO);
-                    addField(doc, SearchableFields.ComponentID, record.getComponentId(), Store.NO);
-                    addField(doc, SearchableFields.AlternateIdentifierURI, record.getAlternateIdentifierUri(), Store.NO);
-                    addField(doc, SearchableFields.EventType, record.getEventType().name(), Store.NO);
-                    addField(doc, SearchableFields.Relationship, record.getRelationship(), Store.NO);
-                    addField(doc, SearchableFields.Details, record.getDetails(), Store.NO);
-                    addField(doc, SearchableFields.ContentClaimSection, record.getContentClaimSection(), Store.NO);
-                    addField(doc, SearchableFields.ContentClaimContainer, record.getContentClaimContainer(), Store.NO);
-                    addField(doc, SearchableFields.ContentClaimIdentifier, record.getContentClaimIdentifier(), Store.NO);
-                    addField(doc, SearchableFields.SourceQueueIdentifier, record.getSourceQueueIdentifier(), Store.NO);
-
-                    if (nonAttributeSearchableFields.contains(SearchableFields.TransitURI)) {
-                        addField(doc, SearchableFields.TransitURI, record.getTransitUri(), Store.NO);
-                    }
-
-                    for (final SearchableField searchableField : attributeSearchableFields) {
-                        addField(doc, searchableField, attributes.get(searchableField.getSearchableFieldName()), Store.NO);
-                    }
-
-                    final String storageFilename = LuceneUtil.substringBefore(record.getStorageFilename(), ".");
-
-                    // Index the fields that we always index (unless there's nothing else to index at all)
-                    if (!doc.getFields().isEmpty()) {
-                        doc.add(new LongField(SearchableFields.LineageStartDate.getSearchableFieldName(), record.getLineageStartDate(), Store.NO));
-                        doc.add(new LongField(SearchableFields.EventTime.getSearchableFieldName(), record.getEventTime(), Store.NO));
-                        doc.add(new LongField(SearchableFields.FileSize.getSearchableFieldName(), record.getFileSize(), Store.NO));
-                        doc.add(new StringField(FieldNames.STORAGE_FILENAME, storageFilename, Store.YES));
-                        doc.add(new LongField(FieldNames.STORAGE_FILE_OFFSET, record.getStorageByteOffset(), Store.YES));
-
-                        for (final String lineageIdentifier : record.getLineageIdentifiers()) {
-                            addField(doc, SearchableFields.LineageIdentifier, lineageIdentifier, Store.NO);
-                        }
-
-                        // If it's event is a FORK, or JOIN, add the FlowFileUUID for all child/parent UUIDs.
-                        if (record.getEventType() == ProvenanceEventType.FORK || record.getEventType() == ProvenanceEventType.CLONE || record.getEventType() == ProvenanceEventType.REPLAY) {
-                            for (final String uuid : record.getChildUuids()) {
-                                if (!uuid.equals(record.getFlowFileUuid())) {
-                                    addField(doc, SearchableFields.FlowFileUUID, uuid, Store.NO);
-                                }
-                            }
-                        } else if (record.getEventType() == ProvenanceEventType.JOIN) {
-                            for (final String uuid : record.getParentUuids()) {
-                                if (!uuid.equals(record.getFlowFileUuid())) {
-                                    addField(doc, SearchableFields.FlowFileUUID, uuid, Store.NO);
-                                }
-                            }
-                        } else if (record.getEventType() == ProvenanceEventType.RECEIVE && record.getSourceSystemFlowFileIdentifier() != null) {
-                            // If we get a receive with a Source System FlowFile Identifier, we add another Document that shows the UUID
-                            // that the Source System uses to refer to the data.
-                            final String sourceIdentifier = record.getSourceSystemFlowFileIdentifier();
-                            final String sourceFlowFileUUID;
-                            final int lastColon = sourceIdentifier.lastIndexOf(":");
-                            if (lastColon > -1 && lastColon < sourceIdentifier.length() - 2) {
-                                sourceFlowFileUUID = sourceIdentifier.substring(lastColon + 1);
-                            } else {
-                                sourceFlowFileUUID = null;
-                            }
-
-                            if (sourceFlowFileUUID != null) {
-                                addField(doc, SearchableFields.FlowFileUUID, sourceFlowFileUUID, Store.NO);
-                            }
-                        }
-
-                        indexWriter.addDocument(doc);
-                        indexCount++;
-                    }
+                    index(record, indexWriter, blockIndex);
+                    indexCount++;
                 }
 
                 indexWriter.commit();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a1027aea/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
index a7076d5..59dc10b 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
@@ -27,8 +27,8 @@ import java.util.List;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.provenance.SearchableFields;
 import org.apache.nifi.provenance.search.SearchTerm;
-
 import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexableField;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.BooleanClause;
 import org.apache.lucene.search.BooleanClause.Occur;
@@ -78,7 +78,16 @@ public class LuceneUtil {
         final String searchString = baseName + ".";
         for (final Path path : allProvenanceLogs) {
             if (path.toFile().getName().startsWith(searchString)) {
-                matchingFiles.add(path.toFile());
+            	final File file = path.toFile();
+            	if ( file.exists() ) {
+            		matchingFiles.add(file);
+            	} else {
+            		final File dir = file.getParentFile();
+            		final File gzFile = new File(dir, file.getName() + ".gz");
+            		if ( gzFile.exists() ) {
+            			matchingFiles.add(gzFile);
+            		}
+            	}
             }
         }
 
@@ -132,6 +141,19 @@ public class LuceneUtil {
                     return filenameComp;
                 }
 
+                final IndexableField fileOffset1 = o1.getField(FieldNames.BLOCK_INDEX);
+                final IndexableField fileOffset2 = o1.getField(FieldNames.BLOCK_INDEX);
+                if ( fileOffset1 != null && fileOffset2 != null ) {
+                	final int blockIndexResult = Long.compare(fileOffset1.numericValue().longValue(), fileOffset2.numericValue().longValue());
+                	if ( blockIndexResult != 0 ) {
+                		return blockIndexResult;
+                	}
+                	
+                	final long eventId1 = o1.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
+                	final long eventId2 = o2.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
+                	return Long.compare(eventId1, eventId2);
+                }
+                
                 final long offset1 = o1.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
                 final long offset2 = o2.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
                 return Long.compare(offset1, offset2);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a1027aea/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
index 862bc2b..8bdc88a 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
@@ -20,12 +20,79 @@ import java.io.Closeable;
 import java.io.IOException;
 
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.provenance.toc.TocReader;
 
 public interface RecordReader extends Closeable {
 
+	/**
+	 * Returns the next record in the reader, or <code>null</code> if there is no more data available.
+	 * @return
+	 * @throws IOException
+	 */
     StandardProvenanceEventRecord nextRecord() throws IOException;
 
+    /**
+     * Skips the specified number of bytes
+     * @param bytesToSkip
+     * @throws IOException
+     */
     void skip(long bytesToSkip) throws IOException;
 
+    /**
+     * Skips to the specified byte offset in the underlying stream.
+     * @param position
+     * @throws IOException if the underlying stream throws IOException, or if the reader has already
+     * passed the specified byte offset
+     */
     void skipTo(long position) throws IOException;
+    
+    /**
+     * Skips to the specified compression block
+     * 
+     * @param blockIndex
+     * @throws IOException if the underlying stream throws IOException, or if the reader has already
+     * read passed the specified compression block index
+     * @throws IllegalStateException if the RecordReader does not have a TableOfContents associated with it
+     */
+    void skipToBlock(int blockIndex) throws IOException;
+    
+    /**
+     * Returns the block index that the Reader is currently reading from.
+     * Note that the block index is incremented at the beginning of the {@link #nextRecord()}
+     * method. This means that this method will return the block from which the previous record was read, 
+     * if calling {@link #nextRecord()} continually, not the block from which the next record will be read.
+     * @return
+     */
+    int getBlockIndex();
+    
+    /**
+     * Returns <code>true</code> if the compression block index is available. It will be available
+     * if and only if the reader is created with a TableOfContents
+     * 
+     * @return
+     */
+    boolean isBlockIndexAvailable();
+    
+    /**
+     * Returns the {@link TocReader} that is used to keep track of compression blocks, if one exists,
+     * <code>null</code> otherwise
+     * @return
+     */
+    TocReader getTocReader();
+    
+    /**
+     * Returns the number of bytes that have been consumed from the stream (read or skipped).
+     * @return
+     */
+    long getBytesConsumed();
+    
+    /**
+     * Returns the ID of the last event in this record reader, or -1 if the reader has no records or
+     * has already read through all records. Note: This method will consume the stream until the end,
+     * so no more records will be available on this reader after calling this method.
+     * 
+     * @return
+     * @throws IOException
+     */
+    long getMaxEventId() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a1027aea/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
index f902b92..dff281c 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.provenance.serialization;
 
-import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
@@ -24,47 +23,90 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Path;
 import java.util.Collection;
-import java.util.zip.GZIPInputStream;
 
-import org.apache.nifi.stream.io.BufferedInputStream;
 import org.apache.nifi.provenance.StandardRecordReader;
 import org.apache.nifi.provenance.lucene.LuceneUtil;
+import org.apache.nifi.provenance.toc.StandardTocReader;
+import org.apache.nifi.provenance.toc.TocReader;
+import org.apache.nifi.provenance.toc.TocUtil;
 
 public class RecordReaders {
 
     public static RecordReader newRecordReader(File file, final Collection<Path> provenanceLogFiles) throws IOException {
-        if (!file.exists()) {
-            if (provenanceLogFiles == null) {
-                throw new FileNotFoundException(file.toString());
-            }
+        final File originalFile = file;
+        InputStream fis = null;
 
-            final String baseName = LuceneUtil.substringBefore(file.getName(), ".") + ".";
-            for (final Path path : provenanceLogFiles) {
-                if (path.toFile().getName().startsWith(baseName)) {
-                    file = path.toFile();
-                    break;
-                }
-            }
+        try {
+	        if (!file.exists()) {
+	            if (provenanceLogFiles != null) {
+		            final String baseName = LuceneUtil.substringBefore(file.getName(), ".") + ".";
+		            for (final Path path : provenanceLogFiles) {
+		                if (path.toFile().getName().startsWith(baseName)) {
+		                    file = path.toFile();
+		                    break;
+		                }
+		            }
+	            }
+	        }
+	
+	        if ( file.exists() ) {
+	            try {
+	                fis = new FileInputStream(file);
+	            } catch (final FileNotFoundException fnfe) {
+	                fis = null;
+	            }
+	        }
+	        
+	        String filename = file.getName();
+	        openStream: while ( fis == null ) {
+	            final File dir = file.getParentFile();
+	            final String baseName = LuceneUtil.substringBefore(file.getName(), ".");
+	            
+	            // depending on which rollover actions have occurred, we could have 3 possibilities for the
+	            // filename that we need. The majority of the time, we will use the extension ".prov.indexed.gz"
+	            // because most often we are compressing on rollover and most often we have already finished
+	            // compressing by the time that we are querying the data.
+	            for ( final String extension : new String[] {".prov.gz", ".prov"} ) {
+	                file = new File(dir, baseName + extension);
+	                if ( file.exists() ) {
+	                    try {
+	                        fis = new FileInputStream(file);
+	                        filename = baseName + extension;
+	                        break openStream;
+	                    } catch (final FileNotFoundException fnfe) {
+	                        // file was modified by a RolloverAction after we verified that it exists but before we could
+	                        // create an InputStream for it. Start over.
+	                        fis = null;
+	                        continue openStream;
+	                    }
+	                }
+	            }
+	            
+	            break;
+	        }
+	
+	        if ( fis == null ) {
+	            throw new FileNotFoundException("Unable to locate file " + originalFile);
+	        }
+	
+	    	final File tocFile = TocUtil.getTocFile(file);
+	    	if ( tocFile.exists() ) {
+	    		final TocReader tocReader = new StandardTocReader(tocFile);
+	    		return new StandardRecordReader(fis, filename, tocReader);
+	    	} else {
+	    		return new StandardRecordReader(fis, filename);
+	    	}
+        } catch (final IOException ioe) {
+        	if ( fis != null ) {
+        		try {
+        			fis.close();
+        		} catch (final IOException inner) {
+        			ioe.addSuppressed(inner);
+        		}
+        	}
+        	
+        	throw ioe;
         }
-
-        if (file == null || !file.exists()) {
-            throw new FileNotFoundException(file.toString());
-        }
-
-        final InputStream fis = new FileInputStream(file);
-        final InputStream readableStream;
-        if (file.getName().endsWith(".gz")) {
-            readableStream = new BufferedInputStream(new GZIPInputStream(fis));
-        } else {
-            readableStream = new BufferedInputStream(fis);
-        }
-
-        final DataInputStream dis = new DataInputStream(readableStream);
-        @SuppressWarnings("unused")
-        final String repoClassName = dis.readUTF();
-        final int serializationVersion = dis.readInt();
-
-        return new StandardRecordReader(dis, serializationVersion, file.getName());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a1027aea/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
index de98ab9..58f4dc2 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 
 import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.toc.TocWriter;
 
 public interface RecordWriter extends Closeable {
 
@@ -82,4 +83,9 @@ public interface RecordWriter extends Closeable {
      */
     void sync() throws IOException;
 
+    /**
+     * Returns the TOC Writer that is being used to write the Table of Contents for this journal
+     * @return
+     */
+    TocWriter getTocWriter();
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a1027aea/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
index 15349de..47b7c7e 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
@@ -20,11 +20,20 @@ import java.io.File;
 import java.io.IOException;
 
 import org.apache.nifi.provenance.StandardRecordWriter;
+import org.apache.nifi.provenance.toc.StandardTocWriter;
+import org.apache.nifi.provenance.toc.TocUtil;
+import org.apache.nifi.provenance.toc.TocWriter;
 
 public class RecordWriters {
+	private static final int DEFAULT_COMPRESSION_BLOCK_SIZE = 1024 * 1024;	// 1 MB
 
-    public static RecordWriter newRecordWriter(final File file) throws IOException {
-        return new StandardRecordWriter(file);
+    public static RecordWriter newRecordWriter(final File file, final boolean compressed, final boolean createToc) throws IOException {
+    	return newRecordWriter(file, compressed, createToc, DEFAULT_COMPRESSION_BLOCK_SIZE);
+    }
+    
+    public static RecordWriter newRecordWriter(final File file, final boolean compressed, final boolean createToc, final int compressionBlockBytes) throws IOException {
+    	final TocWriter tocWriter = createToc ? new StandardTocWriter(TocUtil.getTocFile(file), false, false) : null;
+        return new StandardRecordWriter(file, tocWriter, compressed, compressionBlockBytes);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a1027aea/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
new file mode 100644
index 0000000..8944cec
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+/**
+ * Standard implementation of TocReader.
+ * 
+ * Expects .toc file to be in the following format;
+ * 
+ * byte 0: version
+ * byte 1: boolean: compressionFlag -> 0 = journal is NOT compressed, 1 = journal is compressed
+ * byte 2-9: long: offset of block 0
+ * byte 10-17: long: offset of block 1
+ * ...
+ * byte (N*8+2)-(N*8+9): long: offset of block N
+ */
+public class StandardTocReader implements TocReader {
+    private final boolean compressed;
+    private final long[] offsets;
+    
+    public StandardTocReader(final File file) throws IOException {
+        try (final FileInputStream fis = new FileInputStream(file);
+             final DataInputStream dis = new DataInputStream(fis)) {
+            
+            final int version = dis.read();
+            if ( version < 0 ) {
+                throw new EOFException();
+            }
+            
+            final int compressionFlag = dis.read();
+            if ( compressionFlag < 0 ) {
+                throw new EOFException();
+            }
+            
+            if ( compressionFlag == 0 ) {
+                compressed = false;
+            } else if ( compressionFlag == 1 ) {
+                compressed = true;
+            } else {
+                throw new IOException("Table of Contents appears to be corrupt: could not read 'compression flag' from header; expected value of 0 or 1 but got " + compressionFlag);
+            }
+            
+            final int numBlocks = (int) ((file.length() - 2) / 8);
+            offsets = new long[numBlocks];
+            
+            for (int i=0; i < numBlocks; i++) {
+                offsets[i] = dis.readLong();
+            }
+        }
+    }
+    
+    @Override
+    public boolean isCompressed() {
+        return compressed;
+    }
+    
+    @Override
+    public long getBlockOffset(final int blockIndex) {
+        if ( blockIndex >= offsets.length ) {
+            return -1L;
+        }
+        return offsets[blockIndex];
+    }
+
+    @Override
+    public long getLastBlockOffset() {
+        if ( offsets.length == 0 ) {
+            return 0L;
+        }
+        return offsets[offsets.length - 1];
+    }
+    
+    @Override
+    public void close() throws IOException {
+    }
+
+	@Override
+	public int getBlockIndex(final long blockOffset) {
+		for (int i=0; i < offsets.length; i++) {
+			if ( offsets[i] > blockOffset ) {
+				return i-1;
+			}
+		}
+		
+		return offsets.length - 1;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a1027aea/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
new file mode 100644
index 0000000..17f1a59
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Standard implementation of {@link TocWriter}.
+ * 
+ * Format of .toc file:
+ * byte 0: version
+ * byte 1: compressed: 0 -> not compressed, 1 -> compressed
+ * byte 2-9: long: offset of block 0
+ * byte 10-17: long: offset of block 1
+ * ...
+ * byte (N*8+2)-(N*8+9): long: offset of block N
+ */
+public class StandardTocWriter implements TocWriter {
+	private static final Logger logger = LoggerFactory.getLogger(StandardTocWriter.class);
+	
+    public static final byte VERSION = 1;
+    
+    private final File file;
+    private final FileOutputStream fos;
+    private final boolean alwaysSync;
+    private int index = -1;
+    
+    /**
+     * Creates a StandardTocWriter that writes to the given file.
+     * @param file the file to write to
+     * @param compressionFlag whether or not the journal is compressed
+     * @throws FileNotFoundException 
+     */
+    public StandardTocWriter(final File file, final boolean compressionFlag, final boolean alwaysSync) throws IOException {
+        if ( file.exists() ) {
+            // Check if the header actually exists. If so, throw FileAlreadyExistsException
+            // If no data is in the file, we will just overwrite it.
+            try (final InputStream fis = new FileInputStream(file);
+                 final InputStream bis = new BufferedInputStream(fis);
+                 final DataInputStream dis = new DataInputStream(bis)) {
+                dis.read();
+                dis.read();
+
+                // we always add the first offset when the writer is created so we allow this to exist.
+                dis.readLong();
+                final int nextByte = dis.read();
+                
+                if ( nextByte > -1 ) {
+                    throw new FileAlreadyExistsException(file.getAbsolutePath());
+                }
+            } catch (final EOFException eof) {
+                // no real data. overwrite file.
+            }
+        }
+        
+        final File tocDir = file.getParentFile();
+        if ( !tocDir.exists() ) {
+        	Files.createDirectories(tocDir.toPath());
+        }
+        
+        this.file = file;
+        fos = new FileOutputStream(file);
+        this.alwaysSync = alwaysSync;
+
+        final byte[] header = new byte[2];
+        header[0] = VERSION;
+        header[1] = (byte) (compressionFlag ? 1 : 0);
+        fos.write(header);
+        fos.flush();
+        
+        if ( alwaysSync ) {
+            sync();
+        }
+    }
+    
+    @Override
+    public void addBlockOffset(final long offset) throws IOException {
+        final BufferedOutputStream bos = new BufferedOutputStream(fos);
+        final DataOutputStream dos = new DataOutputStream(bos);
+        dos.writeLong(offset);
+        dos.flush();
+        index++;
+        logger.debug("Adding block {} at offset {}", index, offset);
+        
+        if ( alwaysSync ) {
+            sync();
+        }
+    }
+    
+    @Override
+    public void sync() throws IOException {
+    	fos.getFD().sync();
+    }
+    
+    @Override
+    public int getCurrentBlockIndex() {
+        return index;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (alwaysSync) {
+            fos.getFD().sync();
+        }
+        
+        fos.close();
+    }
+    
+    @Override
+    public File getFile() {
+        return file;
+    }
+    
+    @Override
+    public String toString() {
+        return "TOC Writer for " + file;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a1027aea/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java
new file mode 100644
index 0000000..7c197be
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import java.io.Closeable;
+
+/**
+ * <p>
+ * Reads a Table of Contents (.toc file) for a corresponding Journal File. We use a Table of Contents
+ * to map a Block Index to an offset into the Journal file where that Block begins. We do this so that
+ * we can then persist a Block Index for an event and then compress the Journal later. This way, we can
+ * get good compression by compressing a large batch of events at once, and this way we can also look up
+ * an event in a Journal that has not been compressed by looking in the Table of Contents or lookup the
+ * event in a Journal post-compression by simply rewriting the TOC while we compress the data.
+ * </p>
+ */
+public interface TocReader extends Closeable {
+
+    /**
+     * Indicates whether or not the corresponding Journal file is compressed
+     * @return
+     */
+    boolean isCompressed();
+
+    /**
+     * Returns the byte offset into the Journal File for the Block with the given index.
+     * @param blockIndex
+     * @return
+     */
+    long getBlockOffset(int blockIndex);
+    
+    /**
+     * Returns the byte offset into the Journal File of the last Block in the given index
+     * @return
+     */
+    long getLastBlockOffset();
+    
+    /**
+     * Returns the index of the block that contains the given offset
+     * @param blockOffset
+     * @return
+     */
+    int getBlockIndex(long blockOffset);
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a1027aea/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java
new file mode 100644
index 0000000..c30ac98
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import java.io.File;
+
+import org.apache.nifi.provenance.lucene.LuceneUtil;
+
+public class TocUtil {
+
+	/**
+	 * Returns the file that should be used as the Table of Contents for the given Journal File
+	 * @param journalFile
+	 * @return
+	 */
+	public static File getTocFile(final File journalFile) {
+    	final File tocDir = new File(journalFile.getParentFile(), "toc");
+    	final String basename = LuceneUtil.substringBefore(journalFile.getName(), ".");
+    	final File tocFile = new File(tocDir, basename + ".toc");
+    	return tocFile;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a1027aea/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java
new file mode 100644
index 0000000..c678053
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.toc;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Writes a .toc file
+ */
+public interface TocWriter extends Closeable {
+
+    /**
+     * Adds the given block offset as the next Block Offset in the Table of Contents
+     * @param offset
+     * @throws IOException
+     */
+    void addBlockOffset(long offset) throws IOException;
+    
+    /**
+     * Returns the index of the current Block
+     * @return
+     */
+    int getCurrentBlockIndex();
+    
+    /**
+     * Returns the file that is currently being written to
+     * @return
+     */
+    File getFile();
+
+    /**
+     * Synchronizes the data with the underlying storage device
+     * @throws IOException
+     */
+    void sync() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a1027aea/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index 5be208b..7822e5c 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.provenance;
 
+import static org.apache.nifi.provenance.TestUtil.createFlowFile;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -25,10 +26,8 @@ import java.io.FileFilter;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -45,7 +44,6 @@ import org.apache.lucene.search.ScoreDoc;
 import org.apache.lucene.search.TopDocs;
 import org.apache.lucene.store.FSDirectory;
 import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.provenance.lineage.EventNode;
 import org.apache.nifi.provenance.lineage.Lineage;
 import org.apache.nifi.provenance.lineage.LineageEdge;
@@ -59,8 +57,10 @@ import org.apache.nifi.provenance.search.SearchableField;
 import org.apache.nifi.provenance.serialization.RecordReader;
 import org.apache.nifi.provenance.serialization.RecordReaders;
 import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.util.file.FileUtils;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
@@ -72,87 +72,47 @@ public class TestPersistentProvenanceRepository {
     public TestName name = new TestName();
 
     private PersistentProvenanceRepository repo;
+    private RepositoryConfiguration config;
     
     public static final int DEFAULT_ROLLOVER_MILLIS = 2000;
 
     private RepositoryConfiguration createConfiguration() {
-        final RepositoryConfiguration config = new RepositoryConfiguration();
+        config = new RepositoryConfiguration();
         config.addStorageDirectory(new File("target/storage/" + UUID.randomUUID().toString()));
-        config.setCompressOnRollover(false);
+        config.setCompressOnRollover(true);
         config.setMaxEventFileLife(2000L, TimeUnit.SECONDS);
+        config.setCompressionBlockBytes(100);
         return config;
     }
 
+    @BeforeClass
+    public static void setLogLevel() {
+    	System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
+    }
+    
     @Before
     public void printTestName() {
         System.out.println("\n\n\n***********************  " + name.getMethodName() + "  *****************************");
     }
 
     @After
-    public void closeRepo() {
+    public void closeRepo() throws IOException {
         if (repo != null) {
             try {
                 repo.close();
             } catch (final IOException ioe) {
             }
         }
+        
+        // Delete all of the storage files. We do this in order to clean up the tons of files that
+        // we create but also to ensure that we have closed all of the file handles. If we leave any
+        // streams open, for instance, this will throw an IOException, causing our unit test to fail.
+        for ( final File storageDir : config.getStorageDirectories() ) {
+        	FileUtils.deleteFile(storageDir, true);
+        }
     }
 
-    private FlowFile createFlowFile(final long id, final long fileSize, final Map<String, String> attributes) {
-        final Map<String, String> attrCopy = new HashMap<>(attributes);
-
-        return new FlowFile() {
-            @Override
-            public long getId() {
-                return id;
-            }
-
-            @Override
-            public long getEntryDate() {
-                return System.currentTimeMillis();
-            }
-
-            @Override
-            public Set<String> getLineageIdentifiers() {
-                return new HashSet<String>();
-            }
-
-            @Override
-            public long getLineageStartDate() {
-                return System.currentTimeMillis();
-            }
-
-            @Override
-            public Long getLastQueueDate() {
-                return System.currentTimeMillis();
-            }
-
-            @Override
-            public boolean isPenalized() {
-                return false;
-            }
-
-            @Override
-            public String getAttribute(final String s) {
-                return attrCopy.get(s);
-            }
-
-            @Override
-            public long getSize() {
-                return fileSize;
-            }
-
-            @Override
-            public Map<String, String> getAttributes() {
-                return attrCopy;
-            }
-
-            @Override
-            public int compareTo(final FlowFile o) {
-                return 0;
-            }
-        };
-    }
+    
 
     private EventReporter getEventReporter() {
         return new EventReporter() {
@@ -261,6 +221,8 @@ public class TestPersistentProvenanceRepository {
             repo.registerEvent(record);
         }
 
+        Thread.sleep(1000L);
+        
         repo.close();
         Thread.sleep(500L); // Give the repo time to shutdown (i.e., close all file handles, etc.)
 
@@ -417,10 +379,10 @@ public class TestPersistentProvenanceRepository {
     @Test
     public void testIndexAndCompressOnRolloverAndSubsequentSearch() throws IOException, InterruptedException, ParseException {
         final RepositoryConfiguration config = createConfiguration();
-        config.setMaxRecordLife(3, TimeUnit.SECONDS);
-        config.setMaxStorageCapacity(1024L * 1024L);
+        config.setMaxRecordLife(30, TimeUnit.SECONDS);
+        config.setMaxStorageCapacity(1024L * 1024L * 10);
         config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
-        config.setMaxEventFileCapacity(1024L * 1024L);
+        config.setMaxEventFileCapacity(1024L * 1024L * 10);
         config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
 
         repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
@@ -923,12 +885,16 @@ public class TestPersistentProvenanceRepository {
         final PersistentProvenanceRepository secondRepo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
         secondRepo.initialize(getEventReporter());
 
-        final ProvenanceEventRecord event11 = builder.build();
-        secondRepo.registerEvent(event11);
-        secondRepo.waitForRollover();
-        final ProvenanceEventRecord event11Retrieved = secondRepo.getEvent(10L);
-        assertNotNull(event11Retrieved);
-        assertEquals(10, event11Retrieved.getEventId());
+        try {
+	        final ProvenanceEventRecord event11 = builder.build();
+	        secondRepo.registerEvent(event11);
+	        secondRepo.waitForRollover();
+	        final ProvenanceEventRecord event11Retrieved = secondRepo.getEvent(10L);
+	        assertNotNull(event11Retrieved);
+	        assertEquals(10, event11Retrieved.getEventId());
+        } finally {
+        	secondRepo.close();
+        }
     }
 
     @Test
@@ -998,6 +964,9 @@ public class TestPersistentProvenanceRepository {
         storageDirFiles = config.getStorageDirectories().get(0).listFiles(indexFileFilter);
         assertEquals(0, storageDirFiles.length);
     }
+    
+    // TODO: test EOF on merge
+    // TODO: Test journal with no records
 
     @Test
     public void testTextualQuery() throws InterruptedException, IOException, ParseException {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a1027aea/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
new file mode 100644
index 0000000..3b0693a
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance;
+
+import static org.apache.nifi.provenance.TestUtil.createFlowFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.nifi.provenance.toc.StandardTocReader;
+import org.apache.nifi.provenance.toc.StandardTocWriter;
+import org.apache.nifi.provenance.toc.TocReader;
+import org.apache.nifi.provenance.toc.TocUtil;
+import org.apache.nifi.provenance.toc.TocWriter;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestStandardRecordReaderWriter {
+    @BeforeClass
+    public static void setLogLevel() {
+    	System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
+    }
+
+	private ProvenanceEventRecord createEvent() {
+		final Map<String, String> attributes = new HashMap<>();
+		attributes.put("filename", "1.txt");
+        attributes.put("uuid", UUID.randomUUID().toString());
+
+		final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+        final ProvenanceEventRecord record = builder.build();
+
+        return record;
+	}
+	
+	@Test
+	public void testSimpleWriteWithToc() throws IOException {
+        final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite");
+        final File tocFile = TocUtil.getTocFile(journalFile);
+        final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
+        final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, false, 1024 * 1024);
+        
+        writer.writeHeader();
+        writer.writeRecord(createEvent(), 1L);
+        writer.close();
+
+        final TocReader tocReader = new StandardTocReader(tocFile);
+        
+        try (final FileInputStream fis = new FileInputStream(journalFile);
+        	final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+        	assertEquals(0, reader.getBlockIndex());
+        	reader.skipToBlock(0);
+        	StandardProvenanceEventRecord recovered = reader.nextRecord();
+        	assertNotNull(recovered);
+        	
+        	assertEquals("nifi://unit-test", recovered.getTransitUri());
+        	assertNull(reader.nextRecord());
+        }
+	}
+	
+	
+	@Test
+	public void testSingleRecordCompressed() throws IOException {
+        final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
+        final File tocFile = TocUtil.getTocFile(journalFile);
+        final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
+        final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 100);
+        
+        writer.writeHeader();
+        writer.writeRecord(createEvent(), 1L);
+        writer.close();
+
+        final TocReader tocReader = new StandardTocReader(tocFile);
+        
+        try (final FileInputStream fis = new FileInputStream(journalFile);
+        	final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+        	assertEquals(0, reader.getBlockIndex());
+        	reader.skipToBlock(0);
+        	StandardProvenanceEventRecord recovered = reader.nextRecord();
+        	assertNotNull(recovered);
+        	
+        	assertEquals("nifi://unit-test", recovered.getTransitUri());
+        	assertNull(reader.nextRecord());
+        }
+	}
+	
+	
+	@Test
+	public void testMultipleRecordsSameBlockCompressed() throws IOException {
+        final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
+        final File tocFile = TocUtil.getTocFile(journalFile);
+        final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
+        // new record each 1 MB of uncompressed data
+        final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 1024 * 1024);
+        
+        writer.writeHeader();
+        for (int i=0; i < 10; i++) {
+        	writer.writeRecord(createEvent(), i);
+        }
+        writer.close();
+
+        final TocReader tocReader = new StandardTocReader(tocFile);
+        
+        try (final FileInputStream fis = new FileInputStream(journalFile);
+        	final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+        	for (int i=0; i < 10; i++) {
+	        	assertEquals(0, reader.getBlockIndex());
+	        	
+	        	// call skipToBlock half the time to ensure that we can; avoid calling it
+	        	// the other half of the time to ensure that it's okay.
+	        	if (i <= 5) {
+	        		reader.skipToBlock(0);
+	        	}
+	        	
+	        	StandardProvenanceEventRecord recovered = reader.nextRecord();
+	        	assertNotNull(recovered);
+	        	assertEquals("nifi://unit-test", recovered.getTransitUri());
+        	}
+        	
+        	assertNull(reader.nextRecord());
+        }
+	}
+	
+	
+	@Test
+	public void testMultipleRecordsMultipleBlocksCompressed() throws IOException {
+        final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
+        final File tocFile = TocUtil.getTocFile(journalFile);
+        final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
+        // new block each 10 bytes
+        final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 100);
+        
+        writer.writeHeader();
+        for (int i=0; i < 10; i++) {
+        	writer.writeRecord(createEvent(), i);
+        }
+        writer.close();
+
+        final TocReader tocReader = new StandardTocReader(tocFile);
+        
+        try (final FileInputStream fis = new FileInputStream(journalFile);
+        	final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+        	for (int i=0; i < 10; i++) {
+	        	StandardProvenanceEventRecord recovered = reader.nextRecord();
+	        	System.out.println(recovered);
+	        	assertNotNull(recovered);
+	        	assertEquals((long) i, recovered.getEventId());
+	        	assertEquals("nifi://unit-test", recovered.getTransitUri());
+        	}
+        	
+        	assertNull(reader.nextRecord());
+        }
+	}
+}