You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2016/11/18 19:54:03 UTC
[3/7] nifi git commit: NIFI-2854: Refactor repositories and swap
files to use schema-based serialization so that nifi can be rolled back to a
previous version after an upgrade.
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index 97226b2..46bea31 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -22,26 +22,20 @@ import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.nio.file.Files;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.SwapContents;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.events.EventReporter;
import org.junit.Test;
import org.mockito.Mockito;
@@ -56,7 +50,9 @@ public class TestFileSystemSwapManager {
final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
- final SwapContents swapContents = FileSystemSwapManager.deserializeFlowFiles(in, "/src/test/resources/old-swap-file.swap", flowFileQueue, new NopResourceClaimManager());
+ final FileSystemSwapManager swapManager = createSwapManager();
+ final SwapContents swapContents = swapManager.peek("src/test/resources/old-swap-file.swap", flowFileQueue);
+
final List<FlowFileRecord> records = swapContents.getFlowFiles();
assertEquals(10000, records.size());
@@ -67,53 +63,32 @@ public class TestFileSystemSwapManager {
}
}
- @Test
- public void testRoundTripSerializeDeserialize() throws IOException {
- final List<FlowFileRecord> toSwap = new ArrayList<>(10000);
- final Map<String, String> attrs = new HashMap<>();
- for (int i = 0; i < 10000; i++) {
- attrs.put("i", String.valueOf(i));
- final FlowFileRecord ff = new TestFlowFile(attrs, i);
- toSwap.add(ff);
- }
-
- final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
- Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
- final String swapLocation = "target/testRoundTrip.swap";
- final File swapFile = new File(swapLocation);
- Files.deleteIfExists(swapFile.toPath());
+ private FileSystemSwapManager createSwapManager() {
+ final FileSystemSwapManager swapManager = new FileSystemSwapManager();
+ final ResourceClaimManager resourceClaimManager = new NopResourceClaimManager();
+ final FlowFileRepository flowfileRepo = Mockito.mock(FlowFileRepository.class);
+ swapManager.initialize(new SwapManagerInitializationContext() {
+ @Override
+ public ResourceClaimManager getResourceClaimManager() {
+ return resourceClaimManager;
+ }
- try (final FileOutputStream fos = new FileOutputStream(swapFile)) {
- FileSystemSwapManager.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
- }
+ @Override
+ public FlowFileRepository getFlowFileRepository() {
+ return flowfileRepo;
+ }
- final SwapContents swappedIn;
- try (final FileInputStream fis = new FileInputStream(swapFile);
- final DataInputStream dis = new DataInputStream(fis)) {
- swappedIn = FileSystemSwapManager.deserializeFlowFiles(dis, swapLocation, flowFileQueue, Mockito.mock(ResourceClaimManager.class));
- }
+ @Override
+ public EventReporter getEventReporter() {
+ return EventReporter.NO_OP;
+ }
+ });
- assertEquals(toSwap.size(), swappedIn.getFlowFiles().size());
- for (int i = 0; i < toSwap.size(); i++) {
- final FlowFileRecord pre = toSwap.get(i);
- final FlowFileRecord post = swappedIn.getFlowFiles().get(i);
-
- assertEquals(pre.getSize(), post.getSize());
- assertEquals(pre.getAttributes(), post.getAttributes());
- assertEquals(pre.getSize(), post.getSize());
- assertEquals(pre.getId(), post.getId());
- assertEquals(pre.getContentClaim(), post.getContentClaim());
- assertEquals(pre.getContentClaimOffset(), post.getContentClaimOffset());
- assertEquals(pre.getEntryDate(), post.getEntryDate());
- assertEquals(pre.getLastQueueDate(), post.getLastQueueDate());
- assertEquals(pre.getLineageStartDate(), post.getLineageStartDate());
- assertEquals(pre.getPenaltyExpirationMillis(), post.getPenaltyExpirationMillis());
- }
+ return swapManager;
}
public class NopResourceClaimManager implements ResourceClaimManager {
-
@Override
public ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant, boolean writable) {
return null;
@@ -165,89 +140,4 @@ public class TestFileSystemSwapManager {
}
}
- private static class TestFlowFile implements FlowFileRecord {
-
- private static final AtomicLong idGenerator = new AtomicLong(0L);
-
- private final long id = idGenerator.getAndIncrement();
- private final long entryDate = System.currentTimeMillis();
- private final long lastQueueDate = System.currentTimeMillis();
- private final Map<String, String> attributes;
- private final long size;
-
- public TestFlowFile(final Map<String, String> attributes, final long size) {
- this.attributes = attributes;
- this.size = size;
- }
-
- @Override
- public long getId() {
- return id;
- }
-
- @Override
- public long getEntryDate() {
- return entryDate;
- }
-
- @Override
- public long getLineageStartDate() {
- return entryDate;
- }
-
- @Override
- public Long getLastQueueDate() {
- return lastQueueDate;
- }
-
- @Override
- public boolean isPenalized() {
- return false;
- }
-
- @Override
- public String getAttribute(String key) {
- return attributes.get(key);
- }
-
- @Override
- public long getSize() {
- return size;
- }
-
- @Override
- public Map<String, String> getAttributes() {
- return Collections.unmodifiableMap(attributes);
- }
-
- @Override
- public int compareTo(final FlowFile o) {
- return Long.compare(id, o.getId());
- }
-
- @Override
- public long getPenaltyExpirationMillis() {
- return -1L;
- }
-
- @Override
- public ContentClaim getContentClaim() {
- return null;
- }
-
- @Override
- public long getContentClaimOffset() {
- return 0;
- }
-
- @Override
- public long getLineageStartIndex() {
- return 0;
- }
-
- @Override
- public long getQueueDateIndex() {
- return 0;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index b2ea0b9..6525822 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -49,6 +49,7 @@ import org.apache.nifi.controller.swap.StandardSwapContents;
import org.apache.nifi.controller.swap.StandardSwapSummary;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.file.FileUtils;
+import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -64,6 +65,7 @@ public class TestWriteAheadFlowFileRepository {
}
@Before
+ @After
public void clearRepo() throws IOException {
final File target = new File("target");
final File testRepo = new File(target, "test-repo");
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/swap/MockFlowFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/swap/MockFlowFile.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/swap/MockFlowFile.java
new file mode 100644
index 0000000..7b5f72c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/swap/MockFlowFile.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.swap;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.flowfile.FlowFile;
+
+public class MockFlowFile implements FlowFileRecord {
+ private static final AtomicLong idGenerator = new AtomicLong(0L);
+
+ private final long id;
+ private final long entryDate = System.currentTimeMillis();
+ private final long lastQueueDate = System.currentTimeMillis();
+ private final Map<String, String> attributes;
+ private final long size;
+ private final ContentClaim contentClaim;
+
+ public MockFlowFile(final Map<String, String> attributes, final long size, final ResourceClaimManager claimManager) {
+ this(attributes, size, createContentClaim(String.valueOf(idGenerator.get()), claimManager));
+ }
+
+ public MockFlowFile(final Map<String, String> attributes, final long size, final ContentClaim contentClaim) {
+ this(idGenerator.getAndIncrement(), attributes, size, contentClaim);
+ }
+
+ public MockFlowFile(final long id, final Map<String, String> attributes, final long size, final ContentClaim contentClaim) {
+ this.id = id;
+ this.attributes = new HashMap<>(attributes);
+ this.size = size;
+ this.contentClaim = contentClaim;
+ }
+
+ public static ContentClaim createContentClaim(final String id, final ResourceClaimManager claimManager) {
+ final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", id, false, false);
+ claimManager.incrementClaimantCount(resourceClaim);
+ return new StandardContentClaim(resourceClaim, 3L);
+ }
+
+ @Override
+ public long getId() {
+ return id;
+ }
+
+ @Override
+ public long getEntryDate() {
+ return entryDate;
+ }
+
+ @Override
+ public long getLineageStartDate() {
+ return entryDate;
+ }
+
+ @Override
+ public Long getLastQueueDate() {
+ return lastQueueDate;
+ }
+
+ @Override
+ public boolean isPenalized() {
+ return false;
+ }
+
+ @Override
+ public String getAttribute(String key) {
+ return attributes.get(key);
+ }
+
+ @Override
+ public long getSize() {
+ return size;
+ }
+
+ @Override
+ public Map<String, String> getAttributes() {
+ return Collections.unmodifiableMap(attributes);
+ }
+
+ @Override
+ public int compareTo(final FlowFile o) {
+ return Long.compare(id, o.getId());
+ }
+
+ @Override
+ public long getPenaltyExpirationMillis() {
+ return -1L;
+ }
+
+ @Override
+ public ContentClaim getContentClaim() {
+ return contentClaim;
+ }
+
+ @Override
+ public long getContentClaimOffset() {
+ return 1;
+ }
+
+ @Override
+ public long getLineageStartIndex() {
+ return 0;
+ }
+
+ @Override
+ public long getQueueDateIndex() {
+ return 0;
+ }
+
+ public static void resetIdGenerator() {
+ idGenerator.set(0L);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/swap/TestSchemaSwapSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/swap/TestSchemaSwapSerializerDeserializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/swap/TestSchemaSwapSerializerDeserializer.java
new file mode 100644
index 0000000..8565f38
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/swap/TestSchemaSwapSerializerDeserializer.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.swap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.SwapContents;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
+import org.apache.nifi.stream.io.NullOutputStream;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestSchemaSwapSerializerDeserializer {
+
+ @Before
+ public void setup() {
+ MockFlowFile.resetIdGenerator();
+ }
+
+ @Test
+ public void testRoundTripSerializeDeserializeSummary() throws IOException {
+ final ResourceClaimManager resourceClaimManager = new StandardResourceClaimManager();
+ final ResourceClaim firstResourceClaim = resourceClaimManager.newResourceClaim("container", "section", "id", true, false);
+ resourceClaimManager.incrementClaimantCount(firstResourceClaim);
+
+ final List<FlowFileRecord> toSwap = new ArrayList<>(10000);
+ final Map<String, String> attrs = new HashMap<>();
+ long size = 0L;
+ final ContentClaim firstClaim = MockFlowFile.createContentClaim("id", resourceClaimManager);
+ for (int i = 0; i < 10000; i++) {
+ attrs.put("i", String.valueOf(i));
+ final FlowFileRecord ff = i < 2 ? new MockFlowFile(attrs, i, firstClaim) : new MockFlowFile(attrs, i, resourceClaimManager);
+ toSwap.add(ff);
+ size += i;
+ }
+
+ final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+ Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
+
+ final String swapLocation = "target/testRoundTrip.swap";
+ final File swapFile = new File(swapLocation);
+ Files.deleteIfExists(swapFile.toPath());
+
+ final SwapSerializer serializer = new SchemaSwapSerializer();
+ try (final FileOutputStream fos = new FileOutputStream(swapFile)) {
+ serializer.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
+ }
+
+ final SwapDeserializer deserializer = new SchemaSwapDeserializer();
+ final SwapSummary swapSummary;
+ try (final FileInputStream fis = new FileInputStream(swapFile);
+ final DataInputStream dis = new DataInputStream(fis)) {
+
+ swapSummary = deserializer.getSwapSummary(dis, swapLocation, resourceClaimManager);
+ }
+
+ assertEquals(10000, swapSummary.getQueueSize().getObjectCount());
+ assertEquals(size, swapSummary.getQueueSize().getByteCount());
+ assertEquals(9999, swapSummary.getMaxFlowFileId().intValue());
+
+ final List<ResourceClaim> resourceClaims = swapSummary.getResourceClaims();
+ assertEquals(10000, resourceClaims.size());
+ assertFalse(resourceClaims.stream().anyMatch(claim -> claim == null));
+ assertEquals(2, resourceClaims.stream().filter(claim -> claim.getId().equals("id")).collect(Collectors.counting()).intValue());
+
+ final Set<ResourceClaim> uniqueClaims = new HashSet<>(resourceClaims);
+ assertEquals(9999, uniqueClaims.size());
+ }
+
+ @Test
+ public void testRoundTripSerializeDeserializeFullSwapFile() throws IOException, InterruptedException {
+ final ResourceClaimManager resourceClaimManager = new StandardResourceClaimManager();
+
+ final List<FlowFileRecord> toSwap = new ArrayList<>(10000);
+ final Map<String, String> attrs = new HashMap<>();
+ long size = 0L;
+ for (int i = 0; i < 10000; i++) {
+ attrs.put("i", String.valueOf(i));
+ final FlowFileRecord ff = new MockFlowFile(attrs, i, resourceClaimManager);
+ toSwap.add(ff);
+ size += i;
+ }
+
+ final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+ Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
+
+ final String swapLocation = "target/testRoundTrip.swap";
+ final File swapFile = new File(swapLocation);
+ Files.deleteIfExists(swapFile.toPath());
+
+ final SwapSerializer serializer = new SchemaSwapSerializer();
+ try (final OutputStream fos = new FileOutputStream(swapFile);
+ final OutputStream out = new BufferedOutputStream(fos)) {
+ serializer.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, out);
+ }
+
+ final SwapContents contents;
+ final SwapDeserializer deserializer = new SchemaSwapDeserializer();
+ try (final FileInputStream fis = new FileInputStream(swapFile);
+ final InputStream bufferedIn = new BufferedInputStream(fis);
+ final DataInputStream dis = new DataInputStream(bufferedIn)) {
+
+ contents = deserializer.deserializeFlowFiles(dis, swapLocation, flowFileQueue, resourceClaimManager);
+ }
+
+ final SwapSummary swapSummary = contents.getSummary();
+ assertEquals(10000, swapSummary.getQueueSize().getObjectCount());
+ assertEquals(size, swapSummary.getQueueSize().getByteCount());
+ assertEquals(9999, swapSummary.getMaxFlowFileId().intValue());
+
+ assertEquals(10000, contents.getFlowFiles().size());
+
+ int counter = 0;
+ for (final FlowFileRecord flowFile : contents.getFlowFiles()) {
+ final int i = counter++;
+ assertEquals(String.valueOf(i), flowFile.getAttribute("i"));
+ assertEquals(i, flowFile.getSize());
+ }
+ }
+
+ @Test
+ @Ignore("For manual testing, in order to ensure that changes do not negatively impact performance")
+ public void testWritePerformance() throws IOException, InterruptedException {
+ final ResourceClaimManager resourceClaimManager = new StandardResourceClaimManager();
+
+ final List<FlowFileRecord> toSwap = new ArrayList<>(10000);
+ final Map<String, String> attrs = new HashMap<>();
+ for (int i = 0; i < 10000; i++) {
+ attrs.put("i", String.valueOf(i));
+ final FlowFileRecord ff = new MockFlowFile(attrs, i, resourceClaimManager);
+ toSwap.add(ff);
+ }
+
+ final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+ Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
+
+ final String swapLocation = "target/testRoundTrip.swap";
+
+ final int iterations = 1000;
+
+ final long start = System.nanoTime();
+ final SwapSerializer serializer = new SchemaSwapSerializer();
+ for (int i = 0; i < iterations; i++) {
+ try (final OutputStream out = new NullOutputStream()) {
+ serializer.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, out);
+ }
+ }
+
+ final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ System.out.println("Wrote " + iterations + " Swap Files in " + millis + " millis");
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/swap/TestSimpleSwapSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/swap/TestSimpleSwapSerializerDeserializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/swap/TestSimpleSwapSerializerDeserializer.java
new file mode 100644
index 0000000..0458333
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/swap/TestSimpleSwapSerializerDeserializer.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.swap;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.SwapContents;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
+import org.apache.nifi.stream.io.NullOutputStream;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestSimpleSwapSerializerDeserializer {
+ @Before
+ public void setup() {
+ MockFlowFile.resetIdGenerator();
+ }
+
+ @Test
+ public void testRoundTripSerializeDeserialize() throws IOException {
+ final ResourceClaimManager resourceClaimManager = new StandardResourceClaimManager();
+
+ final List<FlowFileRecord> toSwap = new ArrayList<>(10000);
+ final Map<String, String> attrs = new HashMap<>();
+ for (int i = 0; i < 10000; i++) {
+ attrs.put("i", String.valueOf(i));
+ final FlowFileRecord ff = new MockFlowFile(attrs, i, resourceClaimManager);
+ toSwap.add(ff);
+ }
+
+ final String queueId = "87bb99fe-412c-49f6-a441-d1b0af4e20b4";
+ final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+ Mockito.when(flowFileQueue.getIdentifier()).thenReturn(queueId);
+
+ final String swapLocation = "target/testRoundTrip-" + queueId + ".swap";
+ final File swapFile = new File(swapLocation);
+
+ Files.deleteIfExists(swapFile.toPath());
+ try {
+ final SimpleSwapSerializer serializer = new SimpleSwapSerializer();
+ try (final FileOutputStream fos = new FileOutputStream(swapFile)) {
+ serializer.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
+ }
+
+ final SimpleSwapDeserializer deserializer = new SimpleSwapDeserializer();
+ final SwapContents swappedIn;
+ try (final FileInputStream fis = new FileInputStream(swapFile);
+ final DataInputStream dis = new DataInputStream(fis)) {
+ swappedIn = deserializer.deserializeFlowFiles(dis, swapLocation, flowFileQueue, resourceClaimManager);
+ }
+
+ assertEquals(toSwap.size(), swappedIn.getFlowFiles().size());
+ for (int i = 0; i < toSwap.size(); i++) {
+ final FlowFileRecord pre = toSwap.get(i);
+ final FlowFileRecord post = swappedIn.getFlowFiles().get(i);
+
+ assertEquals(pre.getSize(), post.getSize());
+ assertEquals(pre.getAttributes(), post.getAttributes());
+ assertEquals(pre.getSize(), post.getSize());
+ assertEquals(pre.getId(), post.getId());
+ assertEquals(pre.getContentClaim(), post.getContentClaim());
+ assertEquals(pre.getContentClaimOffset(), post.getContentClaimOffset());
+ assertEquals(pre.getEntryDate(), post.getEntryDate());
+ assertEquals(pre.getLastQueueDate(), post.getLastQueueDate());
+ assertEquals(pre.getLineageStartDate(), post.getLineageStartDate());
+ assertEquals(pre.getPenaltyExpirationMillis(), post.getPenaltyExpirationMillis());
+ }
+ } finally {
+ Files.deleteIfExists(swapFile.toPath());
+ }
+ }
+
+ @Test
+ @Ignore("For manual testing only. Not intended to be run as part of the automated unit tests but can "
+ + "be convenient for determining a baseline for performance if making modifications.")
+ public void testWritePerformance() throws IOException, InterruptedException {
+ final ResourceClaimManager resourceClaimManager = new StandardResourceClaimManager();
+
+ final List<FlowFileRecord> toSwap = new ArrayList<>(10000);
+ final Map<String, String> attrs = new HashMap<>();
+ for (int i = 0; i < 10000; i++) {
+ attrs.put("i", String.valueOf(i));
+ final FlowFileRecord ff = new MockFlowFile(attrs, i, resourceClaimManager);
+ toSwap.add(ff);
+ }
+
+ final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+ Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
+
+ final String swapLocation = "target/testRoundTrip.swap";
+
+ final int iterations = 1000;
+
+ final long start = System.nanoTime();
+ final SwapSerializer serializer = new SimpleSwapSerializer();
+ for (int i = 0; i < iterations; i++) {
+ try (final OutputStream out = new NullOutputStream()) {
+ serializer.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, out);
+ }
+ }
+
+ final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ System.out.println("Wrote " + iterations + " Swap Files in " + millis + " millis");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml
index 47e7bcd..52e53a8 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml
@@ -36,6 +36,10 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-schema-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/AbstractRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/AbstractRecordWriter.java
new file mode 100644
index 0000000..2c84861
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/AbstractRecordWriter.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.nifi.provenance.serialization.RecordWriter;
+import org.apache.nifi.provenance.toc.TocWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractRecordWriter implements RecordWriter {
+ private static final Logger logger = LoggerFactory.getLogger(AbstractRecordWriter.class);
+
+ private final File file;
+ private final TocWriter tocWriter;
+ private final Lock lock = new ReentrantLock();
+
+ private volatile boolean dirty = false;
+ private volatile boolean closed = false;
+
+ private int recordsWritten = 0;
+
+ public AbstractRecordWriter(final File file, final TocWriter writer) throws IOException {
+ logger.trace("Creating Record Writer for {}", file);
+
+ this.file = file;
+ this.tocWriter = writer;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ closed = true;
+
+ logger.trace("Closing Record Writer for {}", file == null ? null : file.getName());
+
+ lock();
+ try {
+ flush();
+
+ try {
+ // We want to close 'out' only if the writer is not 'dirty'.
+ // If the writer is dirty, then there was a failure to write
+ // to disk, which means that we likely have a partial record written
+ // to disk.
+ //
+ // If we call close() on out, it will in turn call flush() on the underlying
+ // output stream, which is a BufferedOutputStream. As a result, we will end
+ // up flushing the buffer after a partially written record, which results in
+ // essentially random bytes being written to the repository, which causes
+ // corruption and un-recoverability. Since we will close the underlying 'rawOutStream'
+ // below, we will still appropriately clean up the resources help by this writer, so
+ // we are still OK in terms of closing all resources held by the writer.
+ final OutputStream buffered = getBufferedOutputStream();
+ if (buffered != null && !isDirty()) {
+ buffered.close();
+ }
+ } finally {
+ final OutputStream underlying = getUnderlyingOutputStream();
+ if (underlying != null) {
+ try {
+ getUnderlyingOutputStream().close();
+ } finally {
+ if (tocWriter != null) {
+ tocWriter.close();
+ }
+ }
+ }
+ }
+ } catch (final IOException ioe) {
+ markDirty();
+ throw ioe;
+ } finally {
+ unlock();
+ }
+ }
+
+ @Override
+ public int getRecordsWritten() {
+ return recordsWritten;
+ }
+
+ @Override
+ public File getFile() {
+ return file;
+ }
+
+ @Override
+ public void lock() {
+ lock.lock();
+ }
+
+ @Override
+ public void unlock() {
+ lock.unlock();
+ }
+
+ @Override
+ public boolean tryLock() {
+ final boolean obtainedLock = lock.tryLock();
+ if (obtainedLock && isDirty()) {
+ // once we have obtained the lock, we need to check if the writer
+ // has been marked dirty. If so, we cannot write to the underlying
+ // file, so we need to unlock and return false. Otherwise, it's okay
+ // to write to the underlying file, so return true.
+ lock.unlock();
+ return false;
+ }
+ return obtainedLock;
+ }
+
+ @Override
+ public void markDirty() {
+ this.dirty = true;
+ }
+
+ public boolean isDirty() {
+ return dirty;
+ }
+
+ protected void resetDirtyFlag() {
+ this.dirty = false;
+ }
+
+ @Override
+ public void sync() throws IOException {
+ try {
+ if (tocWriter != null) {
+ tocWriter.sync();
+ }
+
+ syncUnderlyingOutputStream();
+ } catch (final IOException ioe) {
+ markDirty();
+ throw ioe;
+ }
+ }
+
+ @Override
+ public TocWriter getTocWriter() {
+ return tocWriter;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return closed;
+ }
+
+ protected abstract OutputStream getBufferedOutputStream();
+
+ protected abstract OutputStream getUnderlyingOutputStream();
+
+ protected abstract void syncUnderlyingOutputStream() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordReader.java
new file mode 100644
index 0000000..297f084
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordReader.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.nifi.provenance.schema.EventRecord;
+import org.apache.nifi.provenance.serialization.CompressableRecordReader;
+import org.apache.nifi.provenance.toc.TocReader;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.apache.nifi.repository.schema.SchemaRecordReader;
+import org.apache.nifi.stream.io.LimitingInputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+
+
+public class ByteArraySchemaRecordReader extends CompressableRecordReader {
+ private RecordSchema schema; // effectively final
+ private SchemaRecordReader recordReader; // effectively final
+
+ public ByteArraySchemaRecordReader(final InputStream in, final String filename, final int maxAttributeChars) throws IOException {
+ super(in, filename, maxAttributeChars);
+ }
+
+ public ByteArraySchemaRecordReader(final InputStream in, final String filename, final TocReader tocReader, final int maxAttributeChars) throws IOException {
+ super(in, filename, tocReader, maxAttributeChars);
+ }
+
+ private void verifySerializationVersion(final int serializationVersion) {
+ if (serializationVersion > ByteArraySchemaRecordWriter.SERIALIZATION_VERSION) {
+ throw new IllegalArgumentException("Unable to deserialize record because the version is " + serializationVersion
+ + " and supported versions are 1-" + ByteArraySchemaRecordWriter.SERIALIZATION_VERSION);
+ }
+ }
+
+ @Override
+ protected void readHeader(final DataInputStream in, final int serializationVersion) throws IOException {
+ verifySerializationVersion(serializationVersion);
+ final int schemaLength = in.readInt();
+ final byte[] buffer = new byte[schemaLength];
+ StreamUtils.fillBuffer(in, buffer);
+
+ try (final ByteArrayInputStream bais = new ByteArrayInputStream(buffer)) {
+ schema = RecordSchema.readFrom(bais);
+ }
+
+ recordReader = SchemaRecordReader.fromSchema(schema);
+ }
+
+ @Override
+ protected StandardProvenanceEventRecord nextRecord(final DataInputStream in, final int serializationVersion) throws IOException {
+ verifySerializationVersion(serializationVersion);
+ final long byteOffset = getBytesConsumed();
+ final int recordLength = in.readInt();
+
+ final InputStream limitedIn = new LimitingInputStream(in, recordLength);
+ final Record eventRecord = recordReader.readRecord(limitedIn);
+
+ return EventRecord.getEvent(eventRecord, getFilename(), byteOffset, getMaxAttributeLength());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordWriter.java
new file mode 100644
index 0000000..cae2f40
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordWriter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.nifi.provenance.schema.EventRecord;
+import org.apache.nifi.provenance.schema.EventRecordFields;
+import org.apache.nifi.provenance.schema.ProvenanceEventSchema;
+import org.apache.nifi.provenance.serialization.CompressableRecordWriter;
+import org.apache.nifi.provenance.toc.TocWriter;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.apache.nifi.repository.schema.SchemaRecordWriter;
+import org.apache.nifi.stream.io.DataOutputStream;
+
+public class ByteArraySchemaRecordWriter extends CompressableRecordWriter {
+ private static final RecordSchema eventSchema = ProvenanceEventSchema.PROVENANCE_EVENT_SCHEMA_V1;
+ private static final RecordSchema contentClaimSchema = new RecordSchema(eventSchema.getField(EventRecordFields.Names.CONTENT_CLAIM).getSubFields());
+ public static final int SERIALIZATION_VERSION = 1;
+ public static final String SERIALIZATION_NAME = "ByteArraySchemaRecordWriter";
+
+ private final SchemaRecordWriter recordWriter = new SchemaRecordWriter();
+
+ public ByteArraySchemaRecordWriter(final File file, final TocWriter tocWriter, final boolean compressed, final int uncompressedBlockSize) throws IOException {
+ super(file, tocWriter, compressed, uncompressedBlockSize);
+ }
+
+ public ByteArraySchemaRecordWriter(final OutputStream out, final TocWriter tocWriter, final boolean compressed, final int uncompressedBlockSize) throws IOException {
+ super(out, tocWriter, compressed, uncompressedBlockSize);
+ }
+
+ @Override
+ protected String getSerializationName() {
+ return SERIALIZATION_NAME;
+ }
+
+ @Override
+ protected int getSerializationVersion() {
+ return SERIALIZATION_VERSION;
+ }
+
+ @Override
+ public void writeHeader(final long firstEventId, final DataOutputStream out) throws IOException {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ eventSchema.writeTo(baos);
+
+ out.writeInt(baos.size());
+ baos.writeTo(out);
+ }
+
+ protected Record createRecord(final ProvenanceEventRecord event, final long eventId) {
+ return new EventRecord(event, eventId, eventSchema, contentClaimSchema);
+ }
+
+ @Override
+ protected void writeRecord(final ProvenanceEventRecord event, final long eventId, final DataOutputStream out) throws IOException {
+ try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(256)) {
+
+ final Record eventRecord = createRecord(event, eventId);
+ recordWriter.writeRecord(eventRecord, baos);
+
+ out.writeInt(baos.size());
+ baos.writeTo(out);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index f70bf7d..c20ce6e 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -110,6 +110,11 @@ import org.apache.nifi.util.RingBuffer;
import org.apache.nifi.util.RingBuffer.ForEachEvaluator;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.Tuple;
+import org.apache.nifi.util.timebuffer.CountSizeEntityAccess;
+import org.apache.nifi.util.timebuffer.LongEntityAccess;
+import org.apache.nifi.util.timebuffer.TimedBuffer;
+import org.apache.nifi.util.timebuffer.TimedCountSize;
+import org.apache.nifi.util.timebuffer.TimestampedLong;
import org.apache.nifi.web.ResourceNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -120,7 +125,6 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
private static final String FILE_EXTENSION = ".prov";
private static final String TEMP_FILE_SUFFIX = ".prov.part";
private static final long PURGE_EVENT_MILLISECONDS = 2500L; //Determines the frequency over which the task to delete old events will occur
- public static final int SERIALIZATION_VERSION = 9;
public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
public static final Pattern INDEX_PATTERN = Pattern.compile("index-\\d+");
public static final Pattern LOG_FILENAME_PATTERN = Pattern.compile("(\\d+).*\\.prov");
@@ -180,6 +184,9 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
private Authorizer authorizer; // effectively final
private ProvenanceAuthorizableFactory resourceFactory; // effectively final
+ private final TimedBuffer<TimedCountSize> updateCounts = new TimedBuffer<>(TimeUnit.SECONDS, 300, new CountSizeEntityAccess());
+ private final TimedBuffer<TimestampedLong> backpressurePauseMillis = new TimedBuffer<>(TimeUnit.SECONDS, 300, new LongEntityAccess());
+
/**
* default no args constructor for service loading only.
*/
@@ -401,7 +408,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
final File journalDirectory = new File(storageDirectory, "journals");
final File journalFile = new File(journalDirectory, String.valueOf(initialRecordId) + ".journal." + i);
- writers[i] = RecordWriters.newRecordWriter(journalFile, false, false);
+ writers[i] = RecordWriters.newSchemaRecordWriter(journalFile, false, false);
writers[i].writeHeader(initialRecordId);
}
@@ -762,18 +769,23 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
try {
try {
+ long recordsWritten = 0L;
for (final ProvenanceEventRecord nextRecord : records) {
final long eventId = idGenerator.getAndIncrement();
bytesWritten += writer.writeRecord(nextRecord, eventId);
+ recordsWritten++;
logger.trace("Wrote record with ID {} to {}", eventId, writer);
}
+ writer.flush();
+
if (alwaysSync) {
writer.sync();
}
totalJournalSize = bytesWrittenSinceRollover.addAndGet(bytesWritten);
recordsWrittenSinceRollover.getAndIncrement();
+ this.updateCounts.add(new TimedCountSize(recordsWritten, bytesWritten));
} catch (final Throwable t) {
// We need to set the repoDirty flag before we release the lock for this journal.
// Otherwise, another thread may write to this journal -- this is a problem because
@@ -1331,14 +1343,17 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
updated = idToPathMap.compareAndSet(existingPathMap, newIdToPathMap);
}
- logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten);
+ final TimedCountSize countSize = updateCounts.getAggregateValue(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES));
+ logger.info("Successfully Rolled over Provenance Event file containing {} records. In the past 5 minutes, "
+ + "{} events have been written to the Provenance Repository, totaling {}",
+ recordsWritten, countSize.getCount(), FormatUtils.formatDataSize(countSize.getSize()));
}
//if files were rolled over or if out of retries stop the future
if (fileRolledOver != null || retryAttempts.decrementAndGet() == 0) {
if (fileRolledOver == null && retryAttempts.get() == 0) {
- logger.error("Failed to merge Journal Files {} after {} attempts. ", journalsToMerge, MAX_JOURNAL_ROLLOVER_RETRIES);
+ logger.error("Failed to merge Journal Files {} after {} attempts.", journalsToMerge, MAX_JOURNAL_ROLLOVER_RETRIES);
}
rolloverCompletions.getAndIncrement();
@@ -1387,6 +1402,8 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
// max capacity for the repo, or if we have 5 sets of journal files waiting to be merged, we will block here until
// that is no longer the case.
if (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) {
+ final long stopTheWorldStart = System.nanoTime();
+
logger.warn("The rate of the dataflow is exceeding the provenance recording rate. "
+ "Slowing down flow to accommodate. Currently, there are {} journal files ({} bytes) and "
+ "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold);
@@ -1428,8 +1445,12 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
repoSize = getSize(getLogFiles(), 0L);
}
+ final long stopTheWorldNanos = System.nanoTime() - stopTheWorldStart;
+ backpressurePauseMillis.add(new TimestampedLong(stopTheWorldNanos));
+ final TimestampedLong pauseNanosLastFiveMinutes = backpressurePauseMillis.getAggregateValue(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES));
logger.info("Provenance Repository has now caught up with rolling over journal files. Current number of "
- + "journal files to be rolled over is {}", journalFileCount);
+ + "journal files to be rolled over is {}. Provenance Repository Back Pressure paused Session commits for {} ({} total in the last 5 minutes).",
+ journalFileCount, FormatUtils.formatNanos(stopTheWorldNanos, true), FormatUtils.formatNanos(pauseNanosLastFiveMinutes.getValue(), true));
}
// we've finished rolling over successfully. Create new writers and reset state.
@@ -1635,7 +1656,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
}
if (eventReporter != null) {
- eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "re " + ioe.toString());
+ eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to merge Journal Files due to " + ioe.toString());
}
}
}
@@ -1696,7 +1717,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
// loop over each entry in the map, persisting the records to the merged file in order, and populating the map
// with the next entry from the journal file from which the previous record was written.
- try (final RecordWriter writer = RecordWriters.newRecordWriter(writerFile, configuration.isCompressOnRollover(), true)) {
+ try (final RecordWriter writer = RecordWriters.newSchemaRecordWriter(writerFile, configuration.isCompressOnRollover(), true)) {
writer.writeHeader(minEventId);
final IndexingAction indexingAction = createIndexingAction();
@@ -1903,14 +1924,23 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
private StandardProvenanceEventRecord truncateAttributes(final StandardProvenanceEventRecord original) {
boolean requireTruncation = false;
- for (final Map.Entry<String, String> entry : original.getAttributes().entrySet()) {
- if (entry.getValue().length() > maxAttributeChars) {
+ for (final String updatedAttr : original.getUpdatedAttributes().values()) {
+ if (updatedAttr != null && updatedAttr.length() > maxAttributeChars) {
requireTruncation = true;
break;
}
}
if (!requireTruncation) {
+ for (final String previousAttr : original.getPreviousAttributes().values()) {
+ if (previousAttr != null && previousAttr.length() > maxAttributeChars) {
+ requireTruncation = true;
+ break;
+ }
+ }
+ }
+
+ if (!requireTruncation) {
return original;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
index 2db9ed3..f018685 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
@@ -17,178 +17,36 @@
package org.apache.nifi.provenance;
import java.io.DataInputStream;
-import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Pattern;
-import java.util.zip.GZIPInputStream;
-import org.apache.nifi.provenance.serialization.RecordReader;
+import org.apache.nifi.provenance.serialization.CompressableRecordReader;
import org.apache.nifi.provenance.toc.TocReader;
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.ByteCountingInputStream;
-import org.apache.nifi.stream.io.LimitingInputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class StandardRecordReader implements RecordReader {
+public class StandardRecordReader extends CompressableRecordReader {
private static final Logger logger = LoggerFactory.getLogger(StandardRecordReader.class);
private static final Pattern UUID_PATTERN = Pattern.compile("[a-fA-F0-9]{8}\\-([a-fA-F0-9]{4}\\-){3}[a-fA-F0-9]{12}");
- private final ByteCountingInputStream rawInputStream;
- private final String filename;
- private final int serializationVersion;
- private final boolean compressed;
- private final TocReader tocReader;
- private final int headerLength;
- private final int maxAttributeChars;
-
- private DataInputStream dis;
- private ByteCountingInputStream byteCountingIn;
-
public StandardRecordReader(final InputStream in, final String filename, final int maxAttributeChars) throws IOException {
this(in, filename, null, maxAttributeChars);
}
public StandardRecordReader(final InputStream in, final String filename, final TocReader tocReader, final int maxAttributeChars) throws IOException {
+ super(in, filename, tocReader, maxAttributeChars);
logger.trace("Creating RecordReader for {}", filename);
-
- rawInputStream = new ByteCountingInputStream(in);
- this.maxAttributeChars = maxAttributeChars;
-
- final InputStream limitedStream;
- if ( tocReader == null ) {
- limitedStream = rawInputStream;
- } else {
- final long offset1 = tocReader.getBlockOffset(1);
- if ( offset1 < 0 ) {
- limitedStream = rawInputStream;
- } else {
- limitedStream = new LimitingInputStream(rawInputStream, offset1 - rawInputStream.getBytesConsumed());
- }
- }
-
- final InputStream readableStream;
- if (filename.endsWith(".gz")) {
- readableStream = new BufferedInputStream(new GZIPInputStream(limitedStream));
- compressed = true;
- } else {
- readableStream = new BufferedInputStream(limitedStream);
- compressed = false;
- }
-
- byteCountingIn = new ByteCountingInputStream(readableStream);
- dis = new DataInputStream(byteCountingIn);
-
- final String repoClassName = dis.readUTF();
- final int serializationVersion = dis.readInt();
- headerLength = repoClassName.getBytes(StandardCharsets.UTF_8).length + 2 + 4; // 2 bytes for string length, 4 for integer.
-
- if (serializationVersion < 1 || serializationVersion > 9) {
- throw new IllegalArgumentException("Unable to deserialize record because the version is " + serializationVersion + " and supported versions are 1-9");
- }
-
- this.serializationVersion = serializationVersion;
- this.filename = filename;
- this.tocReader = tocReader;
- }
-
- @Override
- public void skipToBlock(final int blockIndex) throws IOException {
- if ( tocReader == null ) {
- throw new IllegalStateException("Cannot skip to block " + blockIndex + " for Provenance Log " + filename + " because no Table-of-Contents file was found for this Log");
- }
-
- if ( blockIndex < 0 ) {
- throw new IllegalArgumentException("Cannot skip to block " + blockIndex + " because the value is negative");
- }
-
- if ( blockIndex == getBlockIndex() ) {
- return;
- }
-
- final long offset = tocReader.getBlockOffset(blockIndex);
- if ( offset < 0 ) {
- throw new IOException("Unable to find block " + blockIndex + " in Provenance Log " + filename);
- }
-
- final long curOffset = rawInputStream.getBytesConsumed();
-
- final long bytesToSkip = offset - curOffset;
- if ( bytesToSkip >= 0 ) {
- try {
- StreamUtils.skip(rawInputStream, bytesToSkip);
- logger.debug("Skipped stream from offset {} to {} ({} bytes skipped)", curOffset, offset, bytesToSkip);
- } catch (final IOException e) {
- throw new IOException("Failed to skip to offset " + offset + " for block " + blockIndex + " of Provenance Log " + filename, e);
- }
-
- resetStreamForNextBlock();
- }
- }
-
- private void resetStreamForNextBlock() throws IOException {
- final InputStream limitedStream;
- if ( tocReader == null ) {
- limitedStream = rawInputStream;
- } else {
- final long offset = tocReader.getBlockOffset(1 + getBlockIndex());
- if ( offset < 0 ) {
- limitedStream = rawInputStream;
- } else {
- limitedStream = new LimitingInputStream(rawInputStream, offset - rawInputStream.getBytesConsumed());
- }
- }
-
- final InputStream readableStream;
- if (compressed) {
- readableStream = new BufferedInputStream(new GZIPInputStream(limitedStream));
- } else {
- readableStream = new BufferedInputStream(limitedStream);
- }
-
- byteCountingIn = new ByteCountingInputStream(readableStream, rawInputStream.getBytesConsumed());
- dis = new DataInputStream(byteCountingIn);
- }
-
-
- @Override
- public TocReader getTocReader() {
- return tocReader;
- }
-
- @Override
- public boolean isBlockIndexAvailable() {
- return tocReader != null;
- }
-
- @Override
- public int getBlockIndex() {
- if ( tocReader == null ) {
- throw new IllegalStateException("Cannot determine Block Index because no Table-of-Contents could be found for Provenance Log " + filename);
- }
-
- return tocReader.getBlockIndex(rawInputStream.getBytesConsumed());
}
- @Override
- public long getBytesConsumed() {
- return byteCountingIn.getBytesConsumed();
- }
-
- private StandardProvenanceEventRecord readPreVersion6Record() throws IOException {
- final long startOffset = byteCountingIn.getBytesConsumed();
-
- if (!isData()) {
- return null;
- }
+ private StandardProvenanceEventRecord readPreVersion6Record(final DataInputStream dis, final int serializationVersion) throws IOException {
+ final long startOffset = getBytesConsumed();
final StandardProvenanceEventRecord.Builder builder = new StandardProvenanceEventRecord.Builder();
final long eventId = dis.readLong();
@@ -254,7 +112,7 @@ public class StandardRecordReader implements RecordReader {
builder.setAttributes(Collections.<String, String>emptyMap(), attrs);
builder.setCurrentContentClaim(null, null, null, null, fileSize);
- builder.setStorageLocation(filename, startOffset);
+ builder.setStorageLocation(getFilename(), startOffset);
final StandardProvenanceEventRecord record = builder.build();
record.setEventId(eventId);
@@ -262,17 +120,18 @@ public class StandardRecordReader implements RecordReader {
}
@Override
- public StandardProvenanceEventRecord nextRecord() throws IOException {
+ public StandardProvenanceEventRecord nextRecord(final DataInputStream dis, final int serializationVersion) throws IOException {
+ if (serializationVersion > StandardRecordWriter.SERIALIZATION_VERISON) {
+ throw new IllegalArgumentException("Unable to deserialize record because the version is "
+ + serializationVersion + " and supported versions are 1-" + StandardRecordWriter.SERIALIZATION_VERISON);
+ }
+
// Schema changed drastically in version 6 so we created a new method to handle old records
if (serializationVersion < 6) {
- return readPreVersion6Record();
+ return readPreVersion6Record(dis, serializationVersion);
}
- final long startOffset = byteCountingIn.getBytesConsumed();
-
- if (!isData()) {
- return null;
- }
+ final long startOffset = getBytesConsumed();
final StandardProvenanceEventRecord.Builder builder = new StandardProvenanceEventRecord.Builder();
@@ -288,7 +147,7 @@ public class StandardRecordReader implements RecordReader {
if (serializationVersion < 9){
final int numLineageIdentifiers = dis.readInt();
for (int i = 0; i < numLineageIdentifiers; i++) {
- readUUID(dis); //skip identifiers
+ readUUID(dis, serializationVersion); //skip identifiers
}
}
@@ -303,7 +162,7 @@ public class StandardRecordReader implements RecordReader {
builder.setComponentId(readNullableString(dis));
builder.setComponentType(readNullableString(dis));
- final String uuid = readUUID(dis);
+ final String uuid = readUUID(dis, serializationVersion);
builder.setFlowFileUUID(uuid);
builder.setDetails(readNullableString(dis));
@@ -335,12 +194,12 @@ public class StandardRecordReader implements RecordReader {
if (eventType == ProvenanceEventType.FORK || eventType == ProvenanceEventType.JOIN || eventType == ProvenanceEventType.CLONE || eventType == ProvenanceEventType.REPLAY) {
final int numParents = dis.readInt();
for (int i = 0; i < numParents; i++) {
- builder.addParentUuid(readUUID(dis));
+ builder.addParentUuid(readUUID(dis, serializationVersion));
}
final int numChildren = dis.readInt();
for (int i = 0; i < numChildren; i++) {
- builder.addChildUuid(readUUID(dis));
+ builder.addChildUuid(readUUID(dis, serializationVersion));
}
} else if (eventType == ProvenanceEventType.RECEIVE) {
builder.setTransitUri(readNullableString(dis));
@@ -357,7 +216,7 @@ public class StandardRecordReader implements RecordReader {
builder.setFlowFileEntryDate(flowFileEntryDate);
builder.setLineageStartDate(lineageStartDate);
- builder.setStorageLocation(filename, startOffset);
+ builder.setStorageLocation(getFilename(), startOffset);
final StandardProvenanceEventRecord record = builder.build();
record.setEventId(eventId);
@@ -373,8 +232,8 @@ public class StandardRecordReader implements RecordReader {
final String truncatedValue;
if (value == null) {
truncatedValue = null;
- } else if (value.length() > maxAttributeChars) {
- truncatedValue = value.substring(0, maxAttributeChars);
+ } else if (value.length() > getMaxAttributeLength()) {
+ truncatedValue = value.substring(0, getMaxAttributeLength());
} else {
truncatedValue = value;
}
@@ -385,8 +244,8 @@ public class StandardRecordReader implements RecordReader {
return attrs;
}
- private String readUUID(final DataInputStream in) throws IOException {
- if ( serializationVersion < 8 ) {
+ private String readUUID(final DataInputStream in, final int serializationVersion) throws IOException {
+ if (serializationVersion < 8) {
final long msb = in.readLong();
final long lsb = in.readLong();
return new UUID(msb, lsb).toString();
@@ -427,80 +286,4 @@ public class StandardRecordReader implements RecordReader {
StreamUtils.fillBuffer(in, strBytes);
return new String(strBytes, "UTF-8");
}
-
- private boolean isData() throws IOException {
- byteCountingIn.mark(1);
- int nextByte = byteCountingIn.read();
- byteCountingIn.reset();
-
- if ( nextByte < 0 ) {
- try {
- resetStreamForNextBlock();
- } catch (final EOFException eof) {
- return false;
- }
-
- byteCountingIn.mark(1);
- nextByte = byteCountingIn.read();
- byteCountingIn.reset();
- }
-
- return nextByte >= 0;
- }
-
- @Override
- public long getMaxEventId() throws IOException {
- if ( tocReader != null ) {
- final long lastBlockOffset = tocReader.getLastBlockOffset();
- skipToBlock(tocReader.getBlockIndex(lastBlockOffset));
- }
-
- ProvenanceEventRecord record;
- ProvenanceEventRecord lastRecord = null;
- try {
- while ((record = nextRecord()) != null) {
- lastRecord = record;
- }
- } catch (final EOFException eof) {
- // This can happen if we stop NIFi while the record is being written.
- // This is OK, we just ignore this record. The session will not have been
- // committed, so we can just process the FlowFile again.
- }
-
- return lastRecord == null ? -1L : lastRecord.getEventId();
- }
-
- @Override
- public void close() throws IOException {
- logger.trace("Closing Record Reader for {}", filename);
-
- dis.close();
- rawInputStream.close();
-
- if ( tocReader != null ) {
- tocReader.close();
- }
- }
-
- @Override
- public void skip(final long bytesToSkip) throws IOException {
- StreamUtils.skip(dis, bytesToSkip);
- }
-
- @Override
- public void skipTo(final long position) throws IOException {
- // we are subtracting headerLength from the number of bytes consumed because we used to
- // consider the offset of the first record "0" - now we consider it whatever position it
- // it really is in the stream.
- final long currentPosition = byteCountingIn.getBytesConsumed() - headerLength;
- if (currentPosition == position) {
- return;
- }
- if (currentPosition > position) {
- throw new IOException("Cannot skip to byte offset " + position + " in stream because already at byte offset " + currentPosition);
- }
-
- final long toSkip = position - currentPosition;
- StreamUtils.skip(dis, toSkip);
- }
}